diff --git a/cmake/define.inc b/cmake/define.inc
index 8d6a398709..93bf602610 100755
--- a/cmake/define.inc
+++ b/cmake/define.inc
@@ -28,3 +28,7 @@ ENDIF ()
IF (TD_RANDOM_FILE_FAIL)
ADD_DEFINITIONS(-DTAOS_RANDOM_FILE_FAIL)
ENDIF ()
+
+IF (TD_RANDOM_NETWORK_FAIL)
+ ADD_DEFINITIONS(-DTAOS_RANDOM_NETWORK_FAIL)
+ENDIF ()
diff --git a/cmake/input.inc b/cmake/input.inc
index 574eac5b45..e963e20240 100755
--- a/cmake/input.inc
+++ b/cmake/input.inc
@@ -36,3 +36,8 @@ IF (${RANDOM_FILE_FAIL} MATCHES "true")
SET(TD_RANDOM_FILE_FAIL TRUE)
MESSAGE(STATUS "build with random-file-fail enabled")
ENDIF ()
+
+IF (${RANDOM_NETWORK_FAIL} MATCHES "true")
+ SET(TD_RANDOM_NETWORK_FAIL TRUE)
+ MESSAGE(STATUS "build with random-network-fail enabled")
+ENDIF ()
diff --git a/src/client/inc/tscLog.h b/src/client/inc/tscLog.h
index c395951742..94adcfe17a 100644
--- a/src/client/inc/tscLog.h
+++ b/src/client/inc/tscLog.h
@@ -31,9 +31,7 @@ extern int32_t tscEmbedded;
#define tscInfo(...) { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC INFO ", tscEmbedded ? 255 : cDebugFlag, __VA_ARGS__); }}
#define tscDebug(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }}
#define tscTrace(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC TRACE ", cDebugFlag, __VA_ARGS__); }}
-
-#define tscDebugDump(...) { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }}
-#define tscTraceDump(...) { if (cDebugFlag & DEBUG_TRACE) { taosPrintLongString("TSC TRACE ", cDebugFlag, __VA_ARGS__); }}
+#define tscDebugL(...){ if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC DEBUG ", cDebugFlag, __VA_ARGS__); }}
#ifdef __cplusplus
}
diff --git a/src/client/src/TSDBJNIConnector.c b/src/client/src/TSDBJNIConnector.c
index d070fad11b..eb9b1cb479 100644
--- a/src/client/src/TSDBJNIConnector.c
+++ b/src/client/src/TSDBJNIConnector.c
@@ -29,9 +29,6 @@
#define jniDebug(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLog("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }}
#define jniTrace(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLog("JNI TRACE ", jniDebugFlag, __VA_ARGS__); }}
-#define jniDebugDump(...) { if (jniDebugFlag & DEBUG_DEBUG) { taosPrintLongString("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }}
-#define jniTraceDump(...) { if (jniDebugFlag & DEBUG_TRACE) { taosPrintLongString("JNI DEBUG ", jniDebugFlag, __VA_ARGS__); }}
-
int __init = 0;
JavaVM *g_vm = NULL;
diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c
index 1ac013d51e..da24e3691a 100644
--- a/src/client/src/tscAsync.c
+++ b/src/client/src/tscAsync.c
@@ -55,7 +55,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
strtolower(pSql->sqlstr, sqlstr);
- tscDebugDump("%p SQL: %s", pSql, pSql->sqlstr);
+ tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
pSql->cmd.curSql = pSql->sqlstr;
int32_t code = tsParseSql(pSql, true);
diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c
index 8919d872a6..13523818d1 100644
--- a/src/client/src/tscLocalMerge.c
+++ b/src/client/src/tscLocalMerge.c
@@ -364,7 +364,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
TSKEY stime = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t revisedSTime =
- taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
+ taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
SFillColInfo* pFillCol = createFillColInfo(pQueryInfo);
@@ -831,7 +831,7 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
if (pFillInfo != NULL) {
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
int64_t revisedSTime =
- taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
+ taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
taosResetFillInfo(pFillInfo, revisedSTime);
}
@@ -1301,9 +1301,7 @@ static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
TSKEY skey = MIN(pQueryInfo->window.skey, pQueryInfo->window.ekey);
int64_t newTime =
- taosGetIntervalStartTimestamp(skey, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
-// taosResetFillInfo(pLocalReducer->pFillInfo, pQueryInfo->order.order, newTime,
-// pQueryInfo->groupbyExpr.numOfGroupCols, 4096, 0, NULL, pLocalReducer->rowSize);
+ taosGetIntervalStartTimestamp(skey, pQueryInfo->slidingTime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, precision);
taosResetFillInfo(pLocalReducer->pFillInfo, newTime);
}
}
diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c
index 22b6be1c57..9f0d1a26ab 100644
--- a/src/client/src/tscPrepare.c
+++ b/src/client/src/tscPrepare.c
@@ -538,7 +538,7 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pRes->numOfRows = 1;
strtolower(pSql->sqlstr, sql);
- tscDebugDump("%p SQL: %s", pSql, pSql->sqlstr);
+ tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
if (tscIsInsertData(pSql->sqlstr)) {
pStmt->isInsert = true;
diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c
index 2b325afa7c..22e91bd658 100644
--- a/src/client/src/tscSQLParser.c
+++ b/src/client/src/tscSQLParser.c
@@ -4487,10 +4487,12 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SUpdateTableTagValMsg* pUpdateMsg = (SUpdateTableTagValMsg*) pCmd->payload;
pUpdateMsg->head.vgId = htonl(pTableMeta->vgroupInfo.vgId);
- pUpdateMsg->tid = htonl(pTableMeta->sid);
- pUpdateMsg->uid = htobe64(pTableMeta->uid);
- pUpdateMsg->colId = htons(pTagsSchema->colId);
- pUpdateMsg->tversion = htons(pTableMeta->tversion);
+ pUpdateMsg->tid = htonl(pTableMeta->sid);
+ pUpdateMsg->uid = htobe64(pTableMeta->uid);
+ pUpdateMsg->colId = htons(pTagsSchema->colId);
+ pUpdateMsg->type = htons(pTagsSchema->type);
+ pUpdateMsg->bytes = htons(pTagsSchema->bytes);
+ pUpdateMsg->tversion = htons(pTableMeta->tversion);
pUpdateMsg->numOfTags = htons(numOfTags);
pUpdateMsg->schemaLen = htonl(schemaLen);
diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c
index a6d111dbc7..cc703e0f93 100644
--- a/src/client/src/tscStream.c
+++ b/src/client/src/tscStream.c
@@ -503,7 +503,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
strtolower(pSql->sqlstr, sqlstr);
- tscDebugDump("%p SQL: %s", pSql, pSql->sqlstr);
+ tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
int32_t code = tsParseSql(pSql, true);
diff --git a/src/common/inc/tname.h b/src/common/inc/tname.h
index 10d725db32..25f78cde7e 100644
--- a/src/common/inc/tname.h
+++ b/src/common/inc/tname.h
@@ -29,4 +29,6 @@ bool tscValidateTableNameLength(size_t len);
SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numOfFilters);
+int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision);
+
#endif // TDENGINE_NAME_H
diff --git a/src/common/inc/tulog.h b/src/common/inc/tulog.h
index 63c838be69..6365b21ef9 100644
--- a/src/common/inc/tulog.h
+++ b/src/common/inc/tulog.h
@@ -32,9 +32,6 @@ extern int32_t tscEmbedded;
#define uDebug(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLog("UTL DEBUG ", uDebugFlag, __VA_ARGS__); }}
#define uTrace(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLog("UTL TRACE ", uDebugFlag, __VA_ARGS__); }}
-#define uDebugDump(...) { if (uDebugFlag & DEBUG_DEBUG) { taosPrintLongString("UTL DEBUG ", uDebugFlag, __VA_ARGS__); }}
-#define uTraceDump(...) { if (uDebugFlag & DEBUG_TRACE) { taosPrintLongString("UTL TRACE ", uDebugFlag, __VA_ARGS__); }}
-
#define pError(...) { taosPrintLog("APP ERROR ", 255, __VA_ARGS__); }
#define pPrint(...) { taosPrintLog("APP INFO ", 255, __VA_ARGS__); }
diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c
index 67c104878a..684fb71af9 100644
--- a/src/common/src/tglobal.c
+++ b/src/common/src/tglobal.c
@@ -1210,7 +1210,7 @@ void taosInitGlobalCfg() {
}
bool taosCheckGlobalCfg() {
- if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG) {
+ if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG || debugFlag & DEBUG_DUMP) {
taosSetAllDebugFlag();
}
diff --git a/src/common/src/tname.c b/src/common/src/tname.c
index 295015d466..b3ff15f936 100644
--- a/src/common/src/tname.c
+++ b/src/common/src/tname.c
@@ -75,3 +75,33 @@ SColumnFilterInfo* tscFilterInfoClone(const SColumnFilterInfo* src, int32_t numO
return pFilter;
}
+
+int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) {
+ if (slidingTime == 0) {
+ return startTime;
+ }
+
+ int64_t start = ((startTime - intervalTime) / slidingTime + 1) * slidingTime;
+ if (!(timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h')) {
+ /*
+ * here we revised the start time of day according to the local time zone,
+ * but in case of DST, the start time of one day need to be dynamically decided.
+ */
+ // todo refactor to extract function that is available for Linux/Windows/Mac platform
+#if defined(WINDOWS) && _MSC_VER >= 1900
+ // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
+ int64_t timezone = _timezone;
+ int32_t daylight = _daylight;
+ char** tzname = _tzname;
+#endif
+
+ int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
+ start += timezone * t;
+ }
+
+ int64_t end = start + intervalTime - 1;
+ if (end < startTime) {
+ start += slidingTime;
+ }
+ return start;
+}
diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c
index 1ae1287888..9c85d4341b 100644
--- a/src/dnode/src/dnodeMgmt.c
+++ b/src/dnode/src/dnodeMgmt.c
@@ -106,6 +106,12 @@ int32_t dnodeInitMgmt() {
}
}
+ int32_t code = vnodeInitResources();
+ if (code != TSDB_CODE_SUCCESS) {
+ dnodeCleanupMgmt();
+ return -1;
+ }
+
// create the queue and thread to handle the message
tsMgmtQset = taosOpenQset();
if (tsMgmtQset == NULL) {
@@ -127,7 +133,7 @@ int32_t dnodeInitMgmt() {
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
- int32_t code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
+ code = pthread_create(&tsQthread, &thAttr, dnodeProcessMgmtQueue, NULL);
pthread_attr_destroy(&thAttr);
if (code != 0) {
dError("failed to create thread to process mgmt queue, reason:%s", strerror(errno));
@@ -282,13 +288,12 @@ static void *dnodeOpenVnode(void *param) {
}
static int32_t dnodeOpenVnodes() {
- int32_t *vnodeList = calloc(TSDB_MAX_VNODES, sizeof(int32_t));
+ int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = dnodeGetVnodeList(vnodeList, &numOfVnodes);
if (status != TSDB_CODE_SUCCESS) {
dInfo("get dnode list failed");
- free(vnodeList);
return status;
}
@@ -334,7 +339,6 @@ static int32_t dnodeOpenVnodes() {
free(pThread->vnodeList);
}
- free(vnodeList);
free(threads);
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
@@ -342,7 +346,7 @@ static int32_t dnodeOpenVnodes() {
}
void dnodeStartStream() {
- int32_t vnodeList[TSDB_MAX_VNODES];
+ int32_t vnodeList[TSDB_MAX_VNODES] = {0};
int32_t numOfVnodes = 0;
int32_t status = vnodeGetVnodeList(vnodeList, &numOfVnodes);
@@ -359,7 +363,7 @@ void dnodeStartStream() {
}
static void dnodeCloseVnodes() {
- int32_t vnodeList[TSDB_MAX_VNODES];
+ int32_t vnodeList[TSDB_MAX_VNODES]= {0};
int32_t numOfVnodes = 0;
int32_t status;
diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c
index b20e6c9749..d439dd985e 100644
--- a/src/dnode/src/dnodeVWrite.c
+++ b/src/dnode/src/dnodeVWrite.c
@@ -247,6 +247,11 @@ static void *dnodeProcessWriteQueue(void *param) {
if (type == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item;
dnodeSendRpcVnodeWriteRsp(pVnode, item, pWrite->rpcMsg.code);
+ } else if (type == TAOS_QTYPE_FWD) {
+ pHead = (SWalHead *)item;
+ vnodeConfirmForward(pVnode, pHead->version, 0);
+ taosFreeQitem(item);
+ vnodeRelease(pVnode);
} else {
taosFreeQitem(item);
vnodeRelease(pVnode);
diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h
index 8b856ccdd9..65cabe6bc1 100644
--- a/src/inc/taosmsg.h
+++ b/src/inc/taosmsg.h
@@ -284,6 +284,8 @@ typedef struct {
int32_t tid;
int16_t tversion;
int16_t colId;
+ int16_t type;
+ int16_t bytes;
int32_t tagValLen;
int16_t numOfTags;
int32_t schemaLen;
diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h
index ec981a7962..a599f100f9 100644
--- a/src/inc/tsdb.h
+++ b/src/inc/tsdb.h
@@ -108,7 +108,9 @@ void tsdbClearTableCfg(STableCfg *config);
void* tsdbGetTableTagVal(const void* pTable, int32_t colId, int16_t type, int16_t bytes);
char* tsdbGetTableName(void *pTable);
-STableId tsdbGetTableId(void *pTable);
+
+#define TSDB_TABLEID(_table) ((STableId*) (_table))
+
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
diff --git a/src/inc/vnode.h b/src/inc/vnode.h
index a034bc5706..1e6cfa9700 100644
--- a/src/inc/vnode.h
+++ b/src/inc/vnode.h
@@ -60,7 +60,10 @@ void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *param);
+void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes);
+
+int32_t vnodeInitResources();
void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c
index 9a041aa4fd..b763c62640 100644
--- a/src/mnode/src/mnodeVgroup.c
+++ b/src/mnode/src/mnodeVgroup.c
@@ -165,6 +165,8 @@ static int32_t mnodeVgroupActionUpdate(SSdbOper *pOper) {
}
mnodeDecDnodeRef(pDnode);
}
+
+ free(pNew);
}
mnodeVgroupUpdateIdPool(pVgroup);
diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h
index 58e255f7bc..00b9f33f1b 100644
--- a/src/os/linux/inc/os.h
+++ b/src/os/linux/inc/os.h
@@ -86,9 +86,28 @@ extern "C" {
} \
}
+#ifdef TAOS_RANDOM_NETWORK_FAIL
+
+ssize_t taos_send_random_fail(int sockfd, const void *buf, size_t len, int flags);
+
+ssize_t taos_sendto_random_fail(int sockfd, const void *buf, size_t len, int flags,
+ const struct sockaddr *dest_addr, socklen_t addrlen);
+ssize_t taos_read_random_fail(int fd, void *buf, size_t count);
+ssize_t taos_write_random_fail(int fd, const void *buf, size_t count);
+
+#define send(sockfd, buf, len, flags) taos_send_random_fail(sockfd, buf, len, flags)
+#define sendto(sockfd, buf, len, flags, dest_addr, addrlen) \
+ taos_sendto_random_fail(sockfd, buf, len, flags, dest_addr, addrlen)
+#define taosWriteSocket(fd, buf, len) taos_write_random_fail(fd, buf, len)
+#define taosReadSocket(fd, buf, len) taos_read_random_fail(fd, buf, len)
+
+#else
+
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
+#endif /* TAOS_RANDOM_NETWORK_FAIL */
+
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
diff --git a/src/os/linux/src/linuxPlatform.c b/src/os/linux/src/linuxPlatform.c
index 9a38c98f81..216d8942bc 100644
--- a/src/os/linux/src/linuxPlatform.c
+++ b/src/os/linux/src/linuxPlatform.c
@@ -270,3 +270,49 @@ int tSystem(const char * cmd)
}
}
+#ifdef TAOS_RANDOM_NETWORK_FAIL
+
+#define RANDOM_NETWORK_FAIL_FACTOR 20
+
+ssize_t taos_send_random_fail(int sockfd, const void *buf, size_t len, int flags)
+{
+ if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ return send(sockfd, buf, len, flags);
+}
+
+ssize_t taos_sendto_random_fail(int sockfd, const void *buf, size_t len, int flags,
+ const struct sockaddr *dest_addr, socklen_t addrlen)
+{
+ if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ return sendto(sockfd, buf, len, flags, dest_addr, addrlen);
+}
+
+ssize_t taos_read_random_fail(int fd, void *buf, size_t count)
+{
+ if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ return read(fd, buf, count);
+}
+
+ssize_t taos_write_random_fail(int fd, const void *buf, size_t count)
+{
+ if (rand() % RANDOM_NETWORK_FAIL_FACTOR == 0) {
+ errno = EINTR;
+ return -1;
+ }
+
+ return write(fd, buf, count);
+}
+
+#endif /* TAOS_RANDOM_NETWORK_FAIL */
diff --git a/src/os/linux/src/linuxSysPara.c b/src/os/linux/src/linuxSysPara.c
index c2134765df..1331503619 100644
--- a/src/os/linux/src/linuxSysPara.c
+++ b/src/os/linux/src/linuxSysPara.c
@@ -160,7 +160,7 @@ static void taosGetSystemTimezone() {
/* load time zone string from /etc/timezone */
FILE *f = fopen("/etc/timezone", "r");
- char buf[65] = {0};
+ char buf[68] = {0};
if (f != NULL) {
int len = fread(buf, 64, 1, f);
if(len < 64 && ferror(f)) {
@@ -170,18 +170,17 @@ static void taosGetSystemTimezone() {
}
fclose(f);
- }
- char *lineEnd = strstr(buf, "\n");
- if (lineEnd != NULL) {
- *lineEnd = 0;
- }
+ char *lineEnd = strstr(buf, "\n");
+ if (lineEnd != NULL) {
+ *lineEnd = 0;
+ }
- // for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
- if (strlen(buf) > 0) {
- setenv("TZ", buf, 1);
+ // for CentOS system, /etc/timezone does not exist. Ignore the TZ environment variables
+ if (strlen(buf) > 0) {
+ setenv("TZ", buf, 1);
+ }
}
-
// get and set default timezone
tzset();
diff --git a/src/plugins/http/inc/httpLog.h b/src/plugins/http/inc/httpLog.h
index 3712360a1c..f4c20a40d5 100644
--- a/src/plugins/http/inc/httpLog.h
+++ b/src/plugins/http/inc/httpLog.h
@@ -26,8 +26,6 @@ extern int32_t httpDebugFlag;
#define httpInfo(...) { if (httpDebugFlag & DEBUG_INFO) { taosPrintLog("HTP INFO ", 255, __VA_ARGS__); }}
#define httpDebug(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLog("HTP DEBUG ", httpDebugFlag, __VA_ARGS__); }}
#define httpTrace(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLog("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }}
-
-#define httpDebugDump(...) { if (httpDebugFlag & DEBUG_DEBUG) { taosPrintLongString("HTP DEBUG ", httpDebugFlag, __VA_ARGS__); }}
-#define httpTraceDump(...) { if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }}
+#define httpTraceL(...){ if (httpDebugFlag & DEBUG_TRACE) { taosPrintLongString("HTP TRACE ", httpDebugFlag, __VA_ARGS__); }}
#endif
diff --git a/src/plugins/http/src/httpHandle.c b/src/plugins/http/src/httpHandle.c
index 056fe425d4..a89ea7d8f1 100644
--- a/src/plugins/http/src/httpHandle.c
+++ b/src/plugins/http/src/httpHandle.c
@@ -313,9 +313,9 @@ bool httpParseRequest(HttpContext* pContext) {
return true;
}
- httpTraceDump("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", pContext, pContext->fd,
- pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.bufsize,
- pContext->parser.buffer);
+ httpTraceL("context:%p, fd:%d, ip:%s, thread:%s, numOfFds:%d, read size:%d, raw data:\n%s", pContext, pContext->fd,
+ pContext->ipstr, pContext->pThread->label, pContext->pThread->numOfFds, pContext->parser.bufsize,
+ pContext->parser.buffer);
if (!httpGetHttpMethod(pContext)) {
return false;
diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c
index a5009c2347..d7d7da6668 100644
--- a/src/plugins/http/src/httpServer.c
+++ b/src/plugins/http/src/httpServer.c
@@ -108,7 +108,7 @@ bool httpReadDataImp(HttpContext *pContext) {
static bool httpDecompressData(HttpContext *pContext) {
if (pContext->contentEncoding != HTTP_COMPRESS_GZIP) {
- httpTraceDump("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos);
+ httpTraceL("context:%p, fd:%d, ip:%s, content:%s", pContext, pContext->fd, pContext->ipstr, pContext->parser.data.pos);
return true;
}
@@ -124,8 +124,8 @@ static bool httpDecompressData(HttpContext *pContext) {
if (ret == 0) {
memcpy(pContext->parser.data.pos, decompressBuf, decompressBufLen);
pContext->parser.data.pos[decompressBufLen] = 0;
- httpTraceDump("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s",
- pContext, pContext->fd, pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf);
+ httpTraceL("context:%p, fd:%d, ip:%s, rawSize:%d, decompressSize:%d, content:%s", pContext, pContext->fd,
+ pContext->ipstr, pContext->parser.data.len, decompressBufLen, decompressBuf);
pContext->parser.data.len = decompressBufLen;
} else {
httpError("context:%p, fd:%d, ip:%s, failed to decompress data, rawSize:%d, error:%d",
diff --git a/src/plugins/http/src/httpSql.c b/src/plugins/http/src/httpSql.c
index 9d3efca01d..c43d928d1b 100644
--- a/src/plugins/http/src/httpSql.c
+++ b/src/plugins/http/src/httpSql.c
@@ -166,8 +166,8 @@ void httpProcessMultiSql(HttpContext *pContext) {
HttpSqlCmd *cmd = multiCmds->cmds + multiCmds->pos;
char *sql = httpGetCmdsString(pContext, cmd->sql);
- httpTraceDump("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd,
- pContext->ipstr, pContext->user, multiCmds->pos, sql);
+ httpTraceL("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start query, sql:%s", pContext, pContext->fd,
+ pContext->ipstr, pContext->user, multiCmds->pos, sql);
taosNotePrintHttp(sql);
taos_query_a(pContext->session->taos, sql, httpProcessMultiSqlCallBack, (void *)pContext);
}
@@ -306,8 +306,8 @@ void httpProcessSingleSqlCmd(HttpContext *pContext) {
return;
}
- httpTraceDump("context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->ipstr,
- pContext->user, sql);
+ httpTraceL("context:%p, fd:%d, ip:%s, user:%s, start query, sql:%s", pContext, pContext->fd, pContext->ipstr,
+ pContext->user, sql);
taosNotePrintHttp(sql);
taos_query_a(pSession->taos, sql, httpProcessSingleSqlCallBack, (void *)pContext);
}
diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c
index 11b701e0ea..36468b3fdb 100644
--- a/src/plugins/monitor/src/monitorMain.c
+++ b/src/plugins/monitor/src/monitorMain.c
@@ -35,9 +35,6 @@
#define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON DEBUG ", monitorDebugFlag, __VA_ARGS__); }}
#define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON TRACE ", monitorDebugFlag, __VA_ARGS__); }}
-#define monitorDebugDump(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLongString("MON DEBUG ", monitorDebugFlag, __VA_ARGS__); }}
-#define monitorTraceDump(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLongString("MON TRACE ", monitorDebugFlag, __VA_ARGS__); }}
-
#define SQL_LENGTH 1024
#define LOG_LEN_STR 100
#define IP_LEN_STR 18
diff --git a/src/plugins/mqtt/inc/mqttLog.h b/src/plugins/mqtt/inc/mqttLog.h
index c0515c2c26..5d5f98a13b 100644
--- a/src/plugins/mqtt/inc/mqttLog.h
+++ b/src/plugins/mqtt/inc/mqttLog.h
@@ -27,7 +27,4 @@ extern int32_t mqttDebugFlag;
#define mqttDebug(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLog("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }}
#define mqttTrace(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLog("MQT TRACE ", mqttDebugFlag, __VA_ARGS__); }}
-#define mqttDebugDump(...) { if (mqttDebugFlag & DEBUG_DEBUG) { taosPrintLongString("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }}
-#define mqttTraceDump(...) { if (mqttDebugFlag & DEBUG_TRACE) { taosPrintLongString("MQT DEBUG ", mqttDebugFlag, __VA_ARGS__); }}
-
#endif
diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h
index 3aa1b60be5..73124b8fc6 100644
--- a/src/query/inc/qExecutor.h
+++ b/src/query/inc/qExecutor.h
@@ -154,6 +154,7 @@ typedef struct SQuery {
} SQuery;
typedef struct SQueryRuntimeEnv {
+ jmp_buf env;
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery;
SQLFunctionCtx* pCtx;
@@ -169,6 +170,8 @@ typedef struct SQueryRuntimeEnv {
void* pSecQueryHandle; // another thread for
bool stableQuery; // super table query or not
bool topBotQuery; // false
+ bool groupbyNormalCol; // denote if this is a groupby normal column query
+ bool hasTagResults; // if there are tag values in final result or not
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
} SQueryRuntimeEnv;
@@ -197,8 +200,10 @@ typedef struct SQInfo {
*/
int32_t tableIndex;
int32_t numOfGroupResultPages;
- _qinfo_free_fn_t freeFn;
- jmp_buf env;
+ _qinfo_free_fn_t freeFn; //todo remove it
+
+ void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables;
+
} SQInfo;
#endif // TDENGINE_QUERYEXECUTOR_H
diff --git a/src/query/inc/qfill.h b/src/query/inc/qfill.h
index da1cd8e5de..ee5974708a 100644
--- a/src/query/inc/qfill.h
+++ b/src/query/inc/qfill.h
@@ -60,8 +60,6 @@ typedef struct SPoint {
void * val;
} SPoint;
-int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision);
-
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType,
SFillColInfo* pFillCol);
diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c
index 55cb35fdf9..bd8c9951e1 100644
--- a/src/query/src/qExecutor.c
+++ b/src/query/src/qExecutor.c
@@ -100,6 +100,16 @@ static UNUSED_FUNC void *u_malloc (size_t __size) {
}
}
+static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
+ uint32_t v = rand();
+ if (v % 5 <= 1) {
+ return NULL;
+ } else {
+ return calloc(num, __size);
+ }
+}
+
+#define calloc u_calloc
#define malloc u_malloc
#endif
@@ -109,6 +119,7 @@ static UNUSED_FUNC void *u_malloc (size_t __size) {
static void setQueryStatus(SQuery *pQuery, int8_t status);
+#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->intervalTime > 0)
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
// todo move to utility
@@ -314,6 +325,24 @@ static bool isTopBottomQuery(SQuery *pQuery) {
return false;
}
+static bool hasTagValOutput(SQuery* pQuery) {
+ SExprInfo *pExprInfo = &pQuery->pSelectExpr[0];
+ if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
+ return true;
+ } else { // set tag value, by which the results are aggregated.
+ for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
+ SExprInfo *pLocalExprInfo = &pQuery->pSelectExpr[idx];
+
+ // ts_comp column required the tag value for join filter
+ if (TSDB_COL_IS_TAG(pLocalExprInfo->base.colInfo.flag)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+}
+
static SDataStatis *getStatisInfo(SQuery *pQuery, SDataStatis *pStatis, int32_t numOfCols, int32_t index) {
// for a tag column, no corresponding field info
SColIndex *pColIndex = &pQuery->pSelectExpr[index].base.colInfo;
@@ -657,26 +686,24 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
- if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
- for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
- int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
+ for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
+ int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
- pCtx[k].nStartQueryTimestamp = pWin->skey;
- pCtx[k].size = forwardStep;
- pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
+ pCtx[k].nStartQueryTimestamp = pWin->skey;
+ pCtx[k].size = forwardStep;
+ pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
- if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
- pCtx[k].ptsList = &tsBuf[offset];
- }
+ if ((aAggs[functionId].nStatus & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
+ pCtx[k].ptsList = &tsBuf[offset];
+ }
- // not a whole block involved in query processing, statistics data can not be used
- if (forwardStep != numOfTotal) {
- pCtx[k].preAggVals.isSet = false;
- }
-
- if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
- aAggs[functionId].xFunction(&pCtx[k]);
- }
+ // not a whole block involved in query processing, statistics data can not be used
+ if (forwardStep != numOfTotal) {
+ pCtx[k].preAggVals.isSet = false;
+ }
+
+ if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
+ aAggs[functionId].xFunction(&pCtx[k]);
}
}
}
@@ -686,14 +713,12 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
SQuery * pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
- if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
- for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
- pCtx[k].nStartQueryTimestamp = pWin->skey;
+ for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
+ pCtx[k].nStartQueryTimestamp = pWin->skey;
- int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
- if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
- aAggs[functionId].xFunctionF(&pCtx[k], offset);
- }
+ int32_t functionId = pQuery->pSelectExpr[k].base.functionId;
+ if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
+ aAggs[functionId].xFunctionF(&pCtx[k], offset);
}
}
}
@@ -787,8 +812,8 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
return NULL;
}
char *dataBlock = NULL;
- SQuery *pQuery = pRuntimeEnv->pQuery;
+ SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
int32_t functionId = pQuery->pSelectExpr[col].base.functionId;
@@ -807,6 +832,10 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
sas->numOfCols = pQuery->numOfCols;
sas->data = calloc(pQuery->numOfCols, POINTER_BYTES);
+ if (sas->data == NULL) {
+ longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
+ }
+
// here the pQuery->colList and sas->colList are identical
int32_t numOfCols = taosArrayGetSize(pDataBlock);
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
@@ -860,6 +889,9 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
+ if (sasArray == NULL) {
+ longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
+ }
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock);
@@ -867,7 +899,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
- if (isIntervalQuery(pQuery) && tsCols != NULL) {
+ if (QUERY_IS_INTERVAL_QUERY(pQuery) && tsCols != NULL) {
int32_t offset = GET_COL_DATA_POS(pQuery, 0, step);
TSKEY ts = tsCols[offset];
@@ -1051,6 +1083,11 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SQuery* pQuery = pRuntimeEnv->pQuery;
+
+ // in case of timestamp column, always generated results.
+ if (functionId == TSDB_FUNC_TS) {
+ return true;
+ }
if (pResInfo->complete || functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
return false;
@@ -1063,7 +1100,6 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
// todo add comments
if ((functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST)) {
return pCtx->param[0].i64Key == pQuery->order.order;
-// return !QUERY_IS_ASC_QUERY(pQuery);
}
// in the supplementary scan, only the following functions need to be executed
@@ -1084,8 +1120,12 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0);
TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL;
- bool groupbyColumnValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
+ bool groupbyColumnValue = pRuntimeEnv->groupbyNormalCol;
+
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
+ if (sasArray == NULL) {
+ longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
+ }
int16_t type = 0;
int16_t bytes = 0;
@@ -1231,7 +1271,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
STableQueryInfo* pTableQInfo = pQuery->current;
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
- if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
+ if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
} else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
@@ -1352,14 +1392,16 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
}
// set the output buffer for the selectivity + tag query
-static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
+static void setCtxTagColumnInfo(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) {
+ SQuery* pQuery = pRuntimeEnv->pQuery;
+
if (isSelectivityWithTagsQuery(pQuery)) {
int32_t num = 0;
int16_t tagLen = 0;
SQLFunctionCtx *p = NULL;
SQLFunctionCtx **pTagCtx = calloc(pQuery->numOfOutput, POINTER_BYTES);
-
+
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SSqlFuncMsg *pSqlFuncMsg = &pQuery->pSelectExpr[i].base;
@@ -1386,7 +1428,7 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
}
}
-static void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
+static FORCE_INLINE void setWindowResultInfo(SResultInfo *pResultInfo, SQuery *pQuery, bool isStableQuery) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
assert(pQuery->pSelectExpr[i].interBytes <= DEFAULT_INTERN_BUF_PAGE_SIZE);
@@ -1477,7 +1519,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
resetCtxOutputBuf(pRuntimeEnv);
}
- setCtxTagColumnInfo(pQuery, pRuntimeEnv->pCtx);
+ setCtxTagColumnInfo(pRuntimeEnv, pRuntimeEnv->pCtx);
return TSDB_CODE_SUCCESS;
_clean:
@@ -1640,8 +1682,7 @@ static bool onlyQueryTags(SQuery* pQuery) {
void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *realWin, STimeWindow *win) {
assert(key >= keyFirst && key <= keyLast && pQuery->slidingTime <= pQuery->intervalTime);
- win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->slidingTimeUnit, pQuery->precision);
-
+ win->skey = taosGetIntervalStartTimestamp(key, pQuery->slidingTime, pQuery->intervalTime, pQuery->slidingTimeUnit, pQuery->precision);
if (keyFirst > (INT64_MAX - pQuery->intervalTime)) {
/*
* if the realSkey > INT64_MAX - pQuery->intervalTime, the query duration between
@@ -2117,7 +2158,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
SQuery* pQuery = pRuntimeEnv->pQuery;
- if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) {
+ if (!QUERY_IS_INTERVAL_QUERY(pQuery) && !pRuntimeEnv->groupbyNormalCol && !isFixedOutputQuery(pQuery)) {
SResultRec *pRec = &pQuery->rec;
if (pQuery->rec.capacity - pQuery->rec.rows < pBlockInfo->rows) {
@@ -2171,7 +2212,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pQueryHandle);
// todo extract methods
- if (isIntervalQuery(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
+ if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
STimeWindow realWin = TSWINDOW_INITIALIZER, w = TSWINDOW_INITIALIZER;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
@@ -2217,7 +2258,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
setQueryStatus(pQuery, QUERY_COMPLETED);
}
- if (isIntervalQuery(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
+ if (QUERY_IS_INTERVAL_QUERY(pQuery) && IS_MASTER_SCAN(pRuntimeEnv)) {
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP;
@@ -2638,7 +2679,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
tfree(pTableList);
qError("QInfo:%p failed alloc memory", pQInfo);
- longjmp(pQInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
+ longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// todo opt for the case of one table per group
@@ -2646,7 +2687,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
for (int32_t i = 0; i < size; ++i) {
STableQueryInfo *item = taosArrayGetP(pGroup, i);
- SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, tsdbGetTableId(item->pTable).tid);
+ SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
if (list.size > 0 && item->windowResInfo.size > 0) {
pTableList[numOfTables] = item;
numOfTables += 1;
@@ -2669,6 +2710,10 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
tLoserTreeCreate(&pTree, numOfTables, &cs, tableResultComparFn);
SResultInfo *pResultInfo = calloc(pQuery->numOfOutput, sizeof(SResultInfo));
+ if (pResultInfo == NULL) {
+ longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
+ }
+
setWindowResultInfo(pResultInfo, pQuery, pRuntimeEnv->stableQuery);
resetMergeResultBuf(pQuery, pRuntimeEnv->pCtx, pResultInfo);
@@ -2869,7 +2914,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
// group by normal columns and interval query on normal table
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
- if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
+ if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
disableFuncInReverseScanImpl(pQInfo, pWindowResInfo, order);
} else { // for simple result of table query,
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { // todo refactor
@@ -3044,7 +3089,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool toContinue = false;
- if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
+ if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
// for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
@@ -3236,10 +3281,10 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
- if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
+ if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
// for each group result, call the finalize function for each column
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
- if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
+ if (pRuntimeEnv->groupbyNormalCol) {
closeAllTimeWindow(pWindowResInfo);
}
@@ -3281,10 +3326,10 @@ static bool hasMainOutput(SQuery *pQuery) {
return false;
}
-static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win) {
+static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void* pTable, STimeWindow win, void* buf) {
SQuery* pQuery = pRuntimeEnv->pQuery;
- STableQueryInfo *pTableQueryInfo = calloc(1, sizeof(STableQueryInfo));
+ STableQueryInfo *pTableQueryInfo = buf;//calloc(1, sizeof(STableQueryInfo));
pTableQueryInfo->win = win;
pTableQueryInfo->lastKey = win.skey;
@@ -3295,7 +3340,8 @@ static STableQueryInfo *createTableQueryInfo( SQueryRuntimeEnv *pRuntimeEnv, voi
int32_t initialSize = 1;
int32_t initialThreshold = 1;
- if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
+ // set more initial size of interval/groupby query
+ if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) {
initialSize = 20;
initialThreshold = 100;
}
@@ -3310,7 +3356,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
}
cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo, numOfCols);
- free(pTableQueryInfo);
}
#define SET_CURRENT_QUERY_TABLE_INFO(_runtime, _tableInfo) \
@@ -3323,7 +3368,6 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
/**
* set output buffer for different group
- * TODO opt performance if current group is identical to previous group
* @param pRuntimeEnv
* @param pDataBlockInfo
*/
@@ -3334,7 +3378,10 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
// lastKey needs to be updated
pTableQueryInfo->lastKey = nextKey;
- setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
+
+ if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
+ setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
+ }
if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) {
return;
@@ -3523,7 +3570,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
int32_t totalSubset = 0;
- if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
+ if (pQInfo->runtimeEnv.groupbyNormalCol || (isIntervalQuery(pQuery))) {
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
} else {
totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
@@ -3617,11 +3664,11 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
assert(pQuery->rec.rows <= pQuery->rec.capacity);
}
-static UNUSED_FUNC void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
+static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
// update the number of result for each, only update the number of rows for the corresponding window result.
- if (pQuery->intervalTime == 0) {
+ if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) {
SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i];
@@ -3635,14 +3682,6 @@ static UNUSED_FUNC void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, S
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
}
}
-
-// int32_t g = pTableQueryInfo->groupIndex;
-// assert(pRuntimeEnv->windowResInfo.size > 0);
-//
-// SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g));
-// if (pWindowRes->numOfRows == 0) {
-// pWindowRes->numOfRows = getNumOfResult(pRuntimeEnv);
-// }
}
}
@@ -3654,7 +3693,7 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
- if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
+ if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
} else {
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
@@ -3696,7 +3735,7 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
} else {
// there are results waiting for returned to client.
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) &&
- (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) &&
+ (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) &&
(pRuntimeEnv->windowResInfo.size > 0)) {
return true;
}
@@ -4101,6 +4140,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
pRuntimeEnv->cur.vgroupIndex = -1;
pRuntimeEnv->stableQuery = isSTableQuery;
pRuntimeEnv->prevGroupId = INT32_MIN;
+ pRuntimeEnv->groupbyNormalCol = isGroupbyNormalCol(pQuery->pGroupbyExpr);
if (pTsBuf != NULL) {
int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
@@ -4125,7 +4165,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
if (pQuery->intervalTime == 0) {
int16_t type = TSDB_DATA_TYPE_NULL;
- if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags;
+ if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags;
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
} else {
type = TSDB_DATA_TYPE_INT; // group id
@@ -4134,7 +4174,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type);
}
- } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) {
+ } else if (pRuntimeEnv->groupbyNormalCol || isIntervalQuery(pQuery)) {
int32_t rows = getInitialPageNum(pQInfo);
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo);
if (code != TSDB_CODE_SUCCESS) {
@@ -4142,7 +4182,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
}
int16_t type = TSDB_DATA_TYPE_NULL;
- if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
+ if (pRuntimeEnv->groupbyNormalCol) {
type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr);
} else {
type = TSDB_DATA_TYPE_TIMESTAMP;
@@ -4160,8 +4200,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
// todo refactor
pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery);
- setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
+ pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery);
+ setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
return TSDB_CODE_SUCCESS;
}
@@ -4202,14 +4243,17 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
- if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
+ if (!pRuntimeEnv->groupbyNormalCol) {
if (!isIntervalQuery(pQuery)) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step);
} else { // interval query
TSKEY nextKey = blockInfo.window.skey;
setIntervalQueryRange(pQInfo, nextKey);
- /*int32_t ret = */setAdditionalInfo(pQInfo, (*pTableQueryInfo)->pTable, *pTableQueryInfo);
+
+ if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
+ setAdditionalInfo(pQInfo, (*pTableQueryInfo)->pTable, *pTableQueryInfo);
+ }
}
}
@@ -4234,9 +4278,9 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb);
- STableId id = tsdbGetTableId(pCheckInfo->pTable);
+ STableId* id = TSDB_TABLEID(pCheckInfo->pTable);
qDebug("QInfo:%p query on (%d): uid:%" PRIu64 ", tid:%d, qrange:%" PRId64 "-%" PRId64, pQInfo, index,
- id.uid, id.tid, pCheckInfo->lastKey, pCheckInfo->win.ekey);
+ id->uid, id->tid, pCheckInfo->lastKey, pCheckInfo->win.ekey);
STsdbQueryCond cond = {
.twindow = {pCheckInfo->lastKey, pCheckInfo->win.ekey},
@@ -4365,7 +4409,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
break;
}
}
- } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query
+ } else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query
while (pQInfo->groupIndex < numOfGroups) {
SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex);
@@ -4503,11 +4547,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
*/
pQInfo->tableIndex++;
- STableIdInfo tidInfo;
- STableId id = tsdbGetTableId(pQuery->current->pTable);
+ STableIdInfo tidInfo = {0};
- tidInfo.uid = id.uid;
- tidInfo.tid = id.tid;
+ STableId* id = TSDB_TABLEID(pQuery->current->pTable);
+ tidInfo.uid = id->uid;
+ tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
@@ -4609,6 +4653,8 @@ static void doRestoreContext(SQInfo *pQInfo) {
static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
+ int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
+
if (isIntervalQuery(pQuery)) {
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
for (int32_t i = 0; i < numOfGroup; ++i) {
@@ -4618,6 +4664,7 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
for (int32_t j = 0; j < num; ++j) {
STableQueryInfo* item = taosArrayGetP(group, j);
closeAllTimeWindow(&item->windowResInfo);
+ removeRedundantWindow(&item->windowResInfo, item->lastKey - step, step);
}
}
} else { // close results for group result
@@ -4669,6 +4716,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
el = scanMultiTableDataBlocks(pQInfo);
qDebug("QInfo:%p reversed scan completed, elapsed time: %" PRId64 "ms", pQInfo, el);
+ doCloseAllTimeWindowAfterScan(pQInfo);
doRestoreContext(pQInfo);
} else {
qDebug("QInfo:%p no need to do reversed scan, query completed", pQInfo);
@@ -4681,7 +4729,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
return;
}
- if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) {
+ if (QUERY_IS_INTERVAL_QUERY(pQuery) || isSumAvgRateQuery(pQuery)) {
if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) {
copyResToQueryResultBuf(pQInfo, pQuery);
@@ -4778,10 +4826,10 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
pQuery->current->lastKey, pQuery->window.ekey);
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
STableIdInfo tidInfo;
- STableId id = tsdbGetTableId(pQuery->current);
+ STableId* id = TSDB_TABLEID(pQuery->current);
- tidInfo.uid = id.uid;
- tidInfo.tid = id.tid;
+ tidInfo.uid = id->uid;
+ tidInfo.tid = id->tid;
tidInfo.key = pQuery->current->lastKey;
taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo);
}
@@ -4871,7 +4919,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
}
// all data scanned, the group by normal column can return
- if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result
+ if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result
pQInfo->groupIndex = 0;
pQuery->rec.rows = 0;
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
@@ -4932,7 +4980,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
STableQueryInfo* item = taosArrayGetP(g, 0);
// group by normal column, sliding window query, interval query are handled by interval query processor
- if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
+ if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { // interval (down sampling operation)
tableIntervalProcess(pQInfo, item);
} else if (isFixedOutputQuery(pQuery)) {
tableFixedOutputProcess(pQInfo, item);
@@ -4947,18 +4995,19 @@ static void tableQueryImpl(SQInfo *pQInfo) {
}
static void stableQueryImpl(SQInfo *pQInfo) {
- SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
+ SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
+ SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->rec.rows = 0;
int64_t st = taosGetTimestampUs();
- if (isIntervalQuery(pQuery) ||
- (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) &&
+ if (QUERY_IS_INTERVAL_QUERY(pQuery) ||
+ (isFixedOutputQuery(pQuery) && (!isPointInterpoQuery(pQuery)) && !pRuntimeEnv->groupbyNormalCol &&
!isFirstLastRowQuery(pQuery))) {
multiTableQueryProcess(pQInfo);
} else {
assert((pQuery->checkBuffer == 1 && pQuery->intervalTime == 0) || isPointInterpoQuery(pQuery) ||
- isFirstLastRowQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr));
+ isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol);
sequentialTableProcess(pQInfo);
}
@@ -5653,28 +5702,33 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
STimeWindow window = pQueryMsg->window;
taosArraySort(pTableIdList, compareTableIdInfo);
+ // TODO optimize the STableQueryInfo malloc strategy
+ pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
+ int32_t index = 0;
+
for(int32_t i = 0; i < numOfGroups; ++i) {
SArray* pa = taosArrayGetP(pTableGroupInfo->pGroupList, i);
- size_t s = taosArrayGetSize(pa);
+ size_t s = taosArrayGetSize(pa);
SArray* p1 = taosArrayInit(s, POINTER_BYTES);
for(int32_t j = 0; j < s; ++j) {
void* pTable = taosArrayGetP(pa, j);
+ STableId* id = TSDB_TABLEID(pTable);
- // NOTE: compare STableIdInfo with STableId
- STableId id = tsdbGetTableId(pTable);
- STableIdInfo* pTableId = taosArraySearch(pTableIdList, &id, compareTableIdInfo);
+ STableIdInfo* pTableId = taosArraySearch(pTableIdList, id, compareTableIdInfo);
if (pTableId != NULL ) {
window.skey = pTableId->key;
} else {
window.skey = pQueryMsg->window.skey;
}
- STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window);
+ void* buf = pQInfo->pBuf + index * sizeof(STableQueryInfo);
+ STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, pTable, window, buf);
item->groupIndex = i;
taosArrayPush(p1, &item);
- taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id.tid, sizeof(id.tid), &item, POINTER_BYTES);
+ taosHashPut(pQInfo->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES);
+ index += 1;
}
taosArrayPush(pQInfo->tableqinfoGroupInfo.pGroupList, &p1);
@@ -5818,6 +5872,7 @@ static void freeQInfo(SQInfo *pQInfo) {
taosArrayDestroy(p);
}
+ tfree(pQInfo->pBuf);
taosArrayDestroy(pQInfo->tableqinfoGroupInfo.pGroupList);
taosHashCleanup(pQInfo->tableqinfoGroupInfo.map);
tsdbDestoryTableGroup(&pQInfo->tableGroupInfo);
@@ -6094,7 +6149,8 @@ void qTableQuery(qinfo_t qinfo) {
return;
}
- int32_t ret = setjmp(pQInfo->env);
+ int32_t ret = setjmp(pQInfo->runtimeEnv.env);
+
// error occurs, record the error code and return to client
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret;
@@ -6277,13 +6333,13 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
output = varDataVal(output);
- STableId id = tsdbGetTableId(item->pTable);
+ STableId* id = TSDB_TABLEID(item->pTable);
- *(int64_t *)output = id.uid; // memory align problem, todo serialize
- output += sizeof(id.uid);
+ *(int64_t *)output = id->uid; // memory align problem, todo serialize
+ output += sizeof(id->uid);
- *(int32_t *)output = id.tid;
- output += sizeof(id.tid);
+ *(int32_t *)output = id->tid;
+ output += sizeof(id->tid);
*(int32_t *)output = pQInfo->vgId;
output += sizeof(pQInfo->vgId);
diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c
index d7725f561d..3b2b2bfff0 100644
--- a/src/query/src/qUtil.c
+++ b/src/query/src/qUtil.c
@@ -32,7 +32,6 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->threshold = threshold;
pWindowResInfo->type = type;
-
_hash_fn_t fn = taosGetDefaultHashFunction(type);
pWindowResInfo->hashList = taosHashInit(threshold, fn, false);
@@ -54,7 +53,8 @@ void destroyTimeWindowRes(SWindowResult *pWindowRes, int32_t nOutputCols) {
if (pWindowRes == NULL) {
return;
}
-
+
+ // TODO opt malloc strategy
for (int32_t i = 0; i < nOutputCols; ++i) {
free(pWindowRes->resultInfo[i].interResultBuf);
}
@@ -180,19 +180,34 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
/*
* remove the results that are not the FIRST time window that spreads beyond the
- * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time
+ * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
+ * NOTE: remove redundant, only when the result set order equals to traverse order
*/
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) {
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
-
- int32_t i = 0;
- while (i < pWindowResInfo->size &&
- ((pWindowResInfo->pResult[i].window.ekey < lastKey && order == QUERY_ASC_FORWARD_STEP) ||
- (pWindowResInfo->pResult[i].window.skey > lastKey && order == QUERY_DESC_FORWARD_STEP))) {
- ++i;
+ if (pWindowResInfo->size <= 1) {
+ return;
}
-
- // assert(i < pWindowResInfo->size);
+
+ // get the result order
+ int32_t resultOrder = (pWindowResInfo->pResult[0].window.skey < pWindowResInfo->pResult[1].window.skey)?
+ TSDB_ORDER_ASC:TSDB_ORDER_DESC;
+
+ if (order != resultOrder) {
+ return;
+ }
+
+ int32_t i = 0;
+ if (order == QUERY_ASC_FORWARD_STEP) {
+ while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].window.ekey < lastKey)) {
+ ++i;
+ }
+ } else if (order == QUERY_DESC_FORWARD_STEP) {
+ while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].window.skey > lastKey)) {
+ ++i;
+ }
+ }
+
if (i < pWindowResInfo->size) {
pWindowResInfo->size = (i + 1);
}
diff --git a/src/query/src/qextbuffer.c b/src/query/src/qextbuffer.c
index ce3f60c072..afcf902123 100644
--- a/src/query/src/qextbuffer.c
+++ b/src/query/src/qextbuffer.c
@@ -118,7 +118,7 @@ static bool tExtMemBufferAlloc(tExtMemBuffer *pMemBuffer) {
* To flush data to disk to accommodate more data
*/
if (pMemBuffer->numOfInMemPages > 0 && pMemBuffer->numOfInMemPages == pMemBuffer->inMemCapacity) {
- if (!tExtMemBufferFlush(pMemBuffer)) {
+ if (tExtMemBufferFlush(pMemBuffer) != 0) {
return false;
}
}
@@ -268,6 +268,7 @@ int32_t tExtMemBufferFlush(tExtMemBuffer *pMemBuffer) {
size_t retVal = fwrite((char *)&(first->item), pMemBuffer->pageSize, 1, pMemBuffer->file);
if (retVal <= 0) { // failed to write to buffer, may be not enough space
ret = TAOS_SYSTEM_ERROR(errno);
+ return ret;
}
pMemBuffer->fileMeta.numOfElemsInFile += first->item.num;
diff --git a/src/query/src/qfill.c b/src/query/src/qfill.c
index 4cb3779166..eebe9a976b 100644
--- a/src/query/src/qfill.c
+++ b/src/query/src/qfill.c
@@ -22,41 +22,6 @@
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
-int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, char timeUnit, int16_t precision) {
- if (slidingTime == 0) {
- return startTime;
- }
-
- if (timeUnit == 'a' || timeUnit == 'm' || timeUnit == 's' || timeUnit == 'h') {
- return (startTime / slidingTime) * slidingTime;
- } else {
- /*
- * here we revised the start time of day according to the local time zone,
- * but in case of DST, the start time of one day need to be dynamically decided.
- *
- * TODO dynamically decide the start time of a day, move to common module
- */
-
- // todo refactor to extract function that is available for Linux/Windows/Mac platform
-#if defined(WINDOWS) && _MSC_VER >= 1900
- // see https://docs.microsoft.com/en-us/cpp/c-runtime-library/daylight-dstbias-timezone-and-tzname?view=vs-2019
- int64_t timezone = _timezone;
- int32_t daylight = _daylight;
- char** tzname = _tzname;
-#endif
-
- int64_t t = (precision == TSDB_TIME_PRECISION_MILLI) ? MILLISECOND_PER_SECOND : MILLISECOND_PER_SECOND * 1000L;
-
- int64_t revStartime = (startTime / slidingTime) * slidingTime + timezone * t;
- int64_t revEndtime = revStartime + slidingTime - 1;
- if (revEndtime < startTime) {
- revStartime += slidingTime;
- }
-
- return revStartime;
- }
-}
-
SFillInfo* taosInitFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SFillColInfo* pFillCol) {
if (fillType == TSDB_FILL_NONE) {
@@ -128,7 +93,7 @@ static TSKEY taosGetRevisedEndKey(TSKEY ekey, int32_t order, int64_t timeInterva
if (order == TSDB_ORDER_ASC) {
return ekey;
} else {
- return taosGetIntervalStartTimestamp(ekey, timeInterval, slidingTimeUnit, precision);
+ return taosGetIntervalStartTimestamp(ekey, timeInterval, timeInterval, slidingTimeUnit, precision);
}
}
diff --git a/src/rpc/inc/rpcLog.h b/src/rpc/inc/rpcLog.h
index 0504ddac43..f0f5c84ff9 100644
--- a/src/rpc/inc/rpcLog.h
+++ b/src/rpc/inc/rpcLog.h
@@ -31,9 +31,7 @@ extern int32_t tscEmbedded;
#define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC INFO ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
#define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC DEBUG ", rpcDebugFlag, __VA_ARGS__); }}
#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC TRACE ", rpcDebugFlag, __VA_ARGS__); }}
-
-#define tDebugDump(x, y) { if (rpcDebugFlag & DEBUG_DEBUG) { taosDumpData((unsigned char *)x, y); }}
-#define tTraceDump(x, y) { if (rpcDebugFlag & DEBUG_TRACE) { taosDumpData((unsigned char *)x, y); }}
+#define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); }}
#ifdef __cplusplus
}
diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c
index c05c8c76e1..6a293520e5 100644
--- a/src/rpc/src/rpcMain.c
+++ b/src/rpc/src/rpcMain.c
@@ -973,7 +973,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
SRpcInfo *pRpc = (SRpcInfo *)pRecv->shandle;
SRpcConn *pConn = (SRpcConn *)pRecv->thandle;
- tTraceDump(pRecv->msg, pRecv->msgLen);
+ tDump(pRecv->msg, pRecv->msgLen);
// underlying UDP layer does not know it is server or client
pRecv->connType = pRecv->connType | pRpc->connType;
@@ -1247,7 +1247,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno));
}
- tTraceDump(msg, msgLen);
+ tDump(msg, msgLen);
}
static void rpcProcessConnError(void *param, void *id) {
diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h
index b736b8824c..b84a2c1243 100644
--- a/src/tsdb/inc/tsdbMain.h
+++ b/src/tsdb/inc/tsdbMain.h
@@ -47,9 +47,9 @@ extern int tsdbDebugFlag;
// Definitions
// ------------------ tsdbMeta.c
typedef struct STable {
+ STableId tableId;
ETableType type;
tstr* name; // NOTE: there a flexible string here
- STableId tableId;
uint64_t suid;
struct STable* pSuper; // super table pointer
uint8_t numOfSchemas;
@@ -294,11 +294,34 @@ typedef struct {
#define TABLE_SUID(t) (t)->suid
#define TABLE_LASTKEY(t) (t)->lastKey
+static FORCE_INLINE STSchema *tsdbGetTableSchema(STable *pTable) {
+ if (pTable->type == TSDB_CHILD_TABLE) { // check child table first
+ STable *pSuper = pTable->pSuper;
+ if (pSuper == NULL) return NULL;
+ return pSuper->schema[pSuper->numOfSchemas - 1];
+ } else if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
+ return pTable->schema[pTable->numOfSchemas - 1];
+ } else {
+ return NULL;
+ }
+}
+
+static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
+ if (pTable->type == TSDB_CHILD_TABLE) { // check child table first
+ STable *pSuper = pTable->pSuper;
+ if (pSuper == NULL) return NULL;
+ return pSuper->tagSchema;
+ } else if (pTable->type == TSDB_SUPER_TABLE) {
+ return pTable->tagSchema;
+ } else {
+ return NULL;
+ }
+}
+
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
void tsdbFreeMeta(STsdbMeta* pMeta);
int tsdbOpenMeta(STsdbRepo* pRepo);
int tsdbCloseMeta(STsdbRepo* pRepo);
-STSchema* tsdbGetTableSchema(STable* pTable);
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version);
STSchema* tsdbGetTableTagSchema(STable* pTable);
diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c
index d474db876c..14da6cb456 100644
--- a/src/tsdb/src/tsdbMeta.c
+++ b/src/tsdb/src/tsdbMeta.c
@@ -282,6 +282,8 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
pMsg->tid = htonl(pMsg->tid);
pMsg->tversion = htons(pMsg->tversion);
pMsg->colId = htons(pMsg->colId);
+ pMsg->type = htons(pMsg->type);
+ pMsg->bytes = htons(pMsg->bytes);
pMsg->tagValLen = htonl(pMsg->tagValLen);
pMsg->numOfTags = htons(pMsg->numOfTags);
pMsg->schemaLen = htonl(pMsg->schemaLen);
diff --git a/src/util/inc/tlog.h b/src/util/inc/tlog.h
index af4e6ae1c2..1f6a81d4b4 100644
--- a/src/util/inc/tlog.h
+++ b/src/util/inc/tlog.h
@@ -26,6 +26,7 @@ extern "C" {
#define DEBUG_INFO DEBUG_WARN
#define DEBUG_DEBUG 4U
#define DEBUG_TRACE 8U
+#define DEBUG_DUMP 16U
#define DEBUG_SCREEN 64U
#define DEBUG_FILE 128U
diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c
index 97eeda010e..eb7a2d5a66 100644
--- a/src/util/src/tfile.c
+++ b/src/util/src/tfile.c
@@ -26,12 +26,12 @@
#include "os.h"
-#define RANDOM_FACTOR 5
+#define RANDOM_FILE_FAIL_FACTOR 5
ssize_t taos_tread(int fd, void *buf, size_t count)
{
#ifdef TAOS_RANDOM_FILE_FAIL
- if (rand() % RANDOM_FACTOR == 0) {
+ if (rand() % RANDOM_FILE_FAIL_FACTOR == 0) {
errno = EIO;
return -1;
}
@@ -43,7 +43,7 @@ ssize_t taos_tread(int fd, void *buf, size_t count)
ssize_t taos_twrite(int fd, void *buf, size_t count)
{
#ifdef TAOS_RANDOM_FILE_FAIL
- if (rand() % RANDOM_FACTOR == 0) {
+ if (rand() % RANDOM_FILE_FAIL_FACTOR == 0) {
errno = EIO;
return -1;
}
@@ -55,7 +55,7 @@ ssize_t taos_twrite(int fd, void *buf, size_t count)
off_t taos_lseek(int fd, off_t offset, int whence)
{
#ifdef TAOS_RANDOM_FILE_FAIL
- if (rand() % RANDOM_FACTOR == 0) {
+ if (rand() % RANDOM_FILE_FAIL_FACTOR == 0) {
errno = EIO;
return -1;
}
diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c
index 0050de3399..b469a5836e 100644
--- a/src/vnode/src/vnodeMain.c
+++ b/src/vnode/src/vnodeMain.c
@@ -34,8 +34,7 @@
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
-static int32_t tsOpennedVnodes;
-static void *tsDnodeVnodesHash;
+static SHashObj*tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
@@ -47,8 +46,6 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
-static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
-
#ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
@@ -58,25 +55,28 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
#endif
-static void vnodeInit() {
+int32_t vnodeInitResources() {
vnodeInitWriteFp();
vnodeInitReadFp();
tsDnodeVnodesHash = taosHashInit(TSDB_MAX_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true);
if (tsDnodeVnodesHash == NULL) {
vError("failed to init vnode list");
+ return TSDB_CODE_VND_OUT_OF_MEMORY;
}
+
+ return TSDB_CODE_SUCCESS;
}
void vnodeCleanupResources() {
- taosHashCleanup(tsDnodeVnodesHash);
- vnodeModuleInit = PTHREAD_ONCE_INIT;
- tsDnodeVnodesHash = NULL;
+ if (tsDnodeVnodesHash != NULL) {
+ taosHashCleanup(tsDnodeVnodesHash);
+ tsDnodeVnodesHash = NULL;
+ }
}
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code;
- pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pTemp = (SVnodeObj *)taosHashGet(tsDnodeVnodesHash, (const char *)&pVnodeCfg->cfg.vgId, sizeof(int32_t));
if (pTemp != NULL) {
@@ -144,11 +144,6 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
}
int32_t vnodeDrop(int32_t vgId) {
- if (tsDnodeVnodesHash == NULL) {
- vDebug("vgId:%d, failed to drop, vgId not exist", vgId);
- return TSDB_CODE_VND_INVALID_VGROUP_ID;
- }
-
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) {
vDebug("vgId:%d, failed to drop, vgId not find", vgId);
@@ -187,7 +182,6 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN];
- pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
if (pVnode == NULL) {
@@ -195,7 +189,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return TAOS_SYSTEM_ERROR(errno);
}
- atomic_add_fetch_32(&tsOpennedVnodes, 1);
atomic_add_fetch_32(&pVnode->refCount, 1);
pVnode->vgId = vnode;
@@ -366,13 +359,11 @@ void vnodeRelease(void *pVnodeRaw) {
free(pVnode);
- int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1);
+ int32_t count = taosHashGetSize(tsDnodeVnodesHash);
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count);
}
void *vnodeGetVnode(int32_t vgId) {
- if (tsDnodeVnodesHash == NULL) return NULL;
-
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
@@ -434,8 +425,6 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
}
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
- if (tsDnodeVnodesHash == NULL) return TSDB_CODE_SUCCESS;
-
SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash);
while (taosHashIterNext(pIter)) {
SVnodeObj **pVnode = taosHashIterGet(pIter);
diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c
index 354caf2af5..9938b34741 100644
--- a/src/vnode/src/vnodeRead.c
+++ b/src/vnode/src/vnodeRead.c
@@ -110,13 +110,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken
if (code == TSDB_CODE_SUCCESS) {
- // add lock here
handle = qRegisterQInfo(pVnode->qMgmt, pQInfo);
if (handle == NULL) { // failed to register qhandle
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
qKillQuery(pQInfo);
qKillQuery(pQInfo);
+ pQInfo = NULL;
} else {
assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) (handle));
diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c
index f13598875a..e76b9601a3 100644
--- a/src/vnode/src/vnodeWrite.c
+++ b/src/vnode/src/vnodeWrite.c
@@ -89,6 +89,11 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
return syncCode;
}
+void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
+ SVnodeObj *pVnode = (SVnodeObj *)param;
+ syncConfirmForward(pVnode->sync, version, code);
+}
+
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int32_t code = TSDB_CODE_SUCCESS;
diff --git a/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml b/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
index f6728359e5..4e307db079 100644
--- a/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
+++ b/tests/comparisonTest/opentsdb/opentsdbtest/pom.xml
@@ -94,7 +94,7 @@
com.google.guava
guava
- 18.0
+ 24.1.1