diff --git a/cmake/cmake.options b/cmake/cmake.options
index 1a1a5b5d78..946eb5d258 100644
--- a/cmake/cmake.options
+++ b/cmake/cmake.options
@@ -18,6 +18,13 @@ IF(${TD_WINDOWS})
ON
)
+ MESSAGE("build iconv Win32")
+ option(
+ BUILD_WITH_ICONV
+ "If build iconv on Windows"
+ ON
+ )
+
ENDIF ()
IF(${TD_LINUX} MATCHES TRUE)
diff --git a/cmake/iconv_CMakeLists.txt.in b/cmake/iconv_CMakeLists.txt.in
new file mode 100644
index 0000000000..31dfd829fc
--- /dev/null
+++ b/cmake/iconv_CMakeLists.txt.in
@@ -0,0 +1,12 @@
+
+# iconv
+ExternalProject_Add(iconv
+ GIT_REPOSITORY https://github.com/win-iconv/win-iconv.git
+ GIT_TAG v0.0.8
+ SOURCE_DIR "${CMAKE_CONTRIB_DIR}/iconv"
+ BINARY_DIR ""
+ CONFIGURE_COMMAND ""
+ BUILD_COMMAND ""
+ INSTALL_COMMAND ""
+ TEST_COMMAND ""
+ )
\ No newline at end of file
diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt
index f87f660ab3..9cf68b87f9 100644
--- a/contrib/CMakeLists.txt
+++ b/contrib/CMakeLists.txt
@@ -83,6 +83,11 @@ if(${BUILD_WITH_NURAFT})
cat("${CMAKE_SUPPORT_DIR}/nuraft_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_NURAFT})
+# iconv
+if(${BUILD_WITH_ICONV})
+ cat("${CMAKE_SUPPORT_DIR}/iconv_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
+endif(${BUILD_WITH_ICONV})
+
# download dependencies
configure_file(${CONTRIB_TMP_FILE} "${CMAKE_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
@@ -208,14 +213,10 @@ endif(${BUILD_WITH_TRAFT})
# LIBUV
if(${BUILD_WITH_UV})
- if (NOT ${CMAKE_SYSTEM_NAME} MATCHES "Windows")
- MESSAGE("Windows need set no-sign-compare")
- add_compile_options(-Wno-sign-compare)
- endif ()
- if (${CMAKE_SYSTEM_NAME} MATCHES "Windows")
- file(READ "libuv/include/uv.h" CONTENTS)
- string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}")
- file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}")
+ if (${TD_WINDOWS})
+ file(READ "libuv/include/uv.h" CONTENTS)
+ string(REGEX REPLACE "/([\r]*)\nstruct uv_tcp_s {" "/\\1\ntypedef BOOL (PASCAL *LPFN_CONNECTEX) (SOCKET s, const struct sockaddr* name, int namelen, PVOID lpSendBuffer, DWORD dwSendDataLength,LPDWORD lpdwBytesSent, LPOVERLAPPED lpOverlapped);\\1\nstruct uv_tcp_s {" CONTENTS_NEW "${CONTENTS}")
+ file(WRITE "libuv/include/uv.h" "${CONTENTS_NEW}")
endif ()
add_subdirectory(libuv)
endif(${BUILD_WITH_UV})
@@ -249,10 +250,14 @@ endif(${BUILD_WITH_SQLITE})
# pthread
if(${BUILD_PTHREAD})
- ADD_DEFINITIONS("-DPTW32_STATIC_LIB")
- add_subdirectory(pthread-win32)
+ add_definitions(-DPTW32_STATIC_LIB)
+ add_subdirectory(pthread)
endif(${BUILD_PTHREAD})
+# iconv
+if(${BUILD_WITH_ICONV})
+ add_subdirectory(iconv)
+endif(${BUILD_WITH_ICONV})
# ================================================================================================
# Build test
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index 0106a74519..0a06f2174c 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -197,6 +197,11 @@ typedef struct {
};
} SMsgHead;
+typedef struct {
+ int32_t workerType;
+ int32_t streamTaskId;
+} SStreamExecMsgHead;
+
// Submit message for one table
typedef struct SSubmitBlk {
int64_t uid; // table unique id
@@ -1891,9 +1896,9 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
return buf;
}
typedef struct {
- int8_t version; // for compatibility(default 0)
- int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
- int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
+ int8_t version; // for compatibility(default 0)
+ int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
+ int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
char indexName[TSDB_INDEX_NAME_LEN];
char timezone[TD_TIMEZONE_LEN]; // sma data expired if timezone changes.
int32_t exprLen;
@@ -1901,7 +1906,7 @@ typedef struct {
int64_t indexUid;
tb_uid_t tableUid; // super/child/common table uid
int64_t interval;
- int64_t offset; // use unit by precision of DB
+ int64_t offset; // use unit by precision of DB
int64_t sliding;
char* expr; // sma expression
char* tagsFilter;
@@ -2302,7 +2307,7 @@ typedef struct {
} SStreamTaskDeployRsp;
typedef struct {
- SMsgHead head;
+ SStreamExecMsgHead head;
// TODO: other info needed by task
} SStreamTaskExecReq;
diff --git a/include/common/tvariant.h b/include/common/tvariant.h
index 995015fe63..63f305ab2d 100644
--- a/include/common/tvariant.h
+++ b/include/common/tvariant.h
@@ -31,7 +31,7 @@ typedef struct SVariant {
uint64_t u;
double d;
char *pz;
- wchar_t *wpz;
+ TdUcs4 *ucs4;
SArray *arr; // only for 'in' query to hold value list, not value for a field
};
} SVariant;
diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h
index c8df40aedc..b6f42eeee0 100644
--- a/include/libs/qcom/query.h
+++ b/include/libs/qcom/query.h
@@ -49,6 +49,10 @@ typedef struct STableComInfo {
int32_t rowSize; // row size of the schema
} STableComInfo;
+typedef struct SIndexMeta {
+
+} SIndexMeta;
+
/*
* ASSERT(sizeof(SCTableMeta) == 24)
* ASSERT(tableType == TSDB_CHILD_TABLE)
diff --git a/include/os/osFile.h b/include/os/osFile.h
index 209cecedf8..508a522679 100644
--- a/include/os/osFile.h
+++ b/include/os/osFile.h
@@ -22,15 +22,6 @@ extern "C" {
#include "osSocket.h"
-#if defined(WINDOWS)
-typedef int32_t FileFd;
-typedef int32_t SocketFd;
-#else
-typedef int32_t FileFd;
-typedef int32_t SocketFd;
-#endif
-
-int64_t taosRead(FileFd fd, void *buf, int64_t count);
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define open OPEN_FUNC_TAOS_FORBID
@@ -42,6 +33,7 @@ int64_t taosRead(FileFd fd, void *buf, int64_t count);
#define close CLOSE_FUNC_TAOS_FORBID
#define fclose FCLOSE_FUNC_TAOS_FORBID
#define fsync FSYNC_FUNC_TAOS_FORBID
+ #define getline GETLINE_FUNC_TAOS_FORBID
// #define fflush FFLUSH_FUNC_TAOS_FORBID
#endif
@@ -49,15 +41,6 @@ int64_t taosRead(FileFd fd, void *buf, int64_t count);
#define PATH_MAX 256
#endif
-typedef int32_t FileFd;
-
-typedef struct TdFile {
- pthread_rwlock_t rwlock;
- int refId;
- FileFd fd;
- FILE *fp;
-} * TdFilePtr, TdFile;
-
typedef struct TdFile *TdFilePtr;
#define TD_FILE_CTEATE 0x0001
@@ -95,10 +78,6 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count);
void taosFprintfFile(TdFilePtr pFile, const char *format, ...);
-#if defined(WINDOWS)
-#define __restrict__
-#endif // WINDOWS
-
int64_t taosGetLineFile(TdFilePtr pFile, char ** __restrict__ ptrBuf);
int32_t taosEOFFile(TdFilePtr pFile);
@@ -111,15 +90,7 @@ int32_t taosRemoveFile(const char *path);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
-#if defined(_TD_DARWIN_64)
-typedef int32_t SocketFd;
-
-int64_t taosSendFile(SocketFd fdDst, FileFd pFileSrc, int64_t *offset, int64_t size);
-int64_t taosFSendFile(FILE *pFileOut, FILE *pFileIn, int64_t *offset, int64_t size);
-#else
-int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size);
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size);
-#endif
void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length);
bool taosValidFile(TdFilePtr pFile);
diff --git a/include/os/osString.h b/include/os/osString.h
index 88160dd69e..9c6d523ab2 100644
--- a/include/os/osString.h
+++ b/include/os/osString.h
@@ -20,16 +20,28 @@
extern "C" {
#endif
+typedef wchar_t TdWchar;
+typedef int32_t TdUcs4;
+
+// If the error is in a third-party library, place this header file under the third-party library header file.
+#ifndef ALLOW_FORBID_FUNC
+ #define iconv_open ICONV_OPEN_FUNC_TAOS_FORBID
+ #define iconv_close ICONV_CLOSE_FUNC_TAOS_FORBID
+ #define iconv ICONV_FUNC_TAOS_FORBID
+ #define wcwidth WCWIDTH_FUNC_TAOS_FORBID
+ #define wcswidth WCSWIDTH_FUNC_TAOS_FORBID
+ #define mbtowc MBTOWC_FUNC_TAOS_FORBID
+ #define mbstowcs MBSTOWCS_FUNC_TAOS_FORBID
+ #define wctomb WCTOMB_FUNC_TAOS_FORBID
+ #define wcstombs WCSTOMBS_FUNC_TAOS_FORBID
+ #define wcsncpy WCSNCPY_FUNC_TAOS_FORBID
+ #define wchar_t WCHAR_T_FUNC_TAOS_FORBID
+#endif
+
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#define tstrdup(str) _strdup(str)
- #define tstrndup(str, size) _strndup(str, size)
- int32_t tgetline(char **lineptr, size_t *n, FILE *stream);
- int32_t twcslen(const wchar_t *wcs);
#else
#define tstrdup(str) strdup(str)
- #define tstrndup(str, size) strndup(str, size)
- #define tgetline(lineptr, n, stream) getline(lineptr, n, stream)
- #define twcslen wcslen
#endif
#define tstrncpy(dst, src, size) \
@@ -38,14 +50,22 @@ extern "C" {
(dst)[(size)-1] = 0; \
} while (0)
+int32_t taosUcs4len(TdUcs4 *ucs4);
int64_t taosStr2int64(const char *str);
-// USE_LIBICONV
-int32_t taosUcs4ToMbs(void *ucs4, int32_t ucs4_max_len, char *mbs);
-bool taosMbsToUcs4(const char *mbs, size_t mbs_len, char *ucs4, int32_t ucs4_max_len, int32_t *len);
-int32_t tasoUcs4Compare(void *f1_ucs4, void *f2_ucs4, int32_t bytes, int8_t ncharSize);
+int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs);
+bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len);
+int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes);
+TdUcs4* tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4);
bool taosValidateEncodec(const char *encodec);
+int32_t taosWcharWidth(TdWchar wchar);
+int32_t taosWcharsWidth(TdWchar *pWchar, int32_t size);
+int32_t taosMbToWchar(TdWchar *pWchar, const char *pStr, int32_t size);
+int32_t taosMbsToWchars(TdWchar *pWchars, const char *pStrs, int32_t size);
+int32_t taosWcharToMb(char *pStr, TdWchar wchar);
+int32_t taosWcharsToMbs(char *pStrs, TdWchar *pWchars, int32_t size);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/util/tcompare.h b/include/util/tcompare.h
index 4c80eeb4f6..cc9e8ae464 100644
--- a/include/util/tcompare.h
+++ b/include/util/tcompare.h
@@ -46,7 +46,7 @@ typedef struct SPatternCompareInfo {
int32_t patternMatch(const char *pattern, const char *str, size_t size, const SPatternCompareInfo *pInfo);
-int32_t WCSPatternMatch(const wchar_t *pattern, const wchar_t *str, size_t size, const SPatternCompareInfo *pInfo);
+int32_t WCSPatternMatch(const TdUcs4 *pattern, const TdUcs4 *str, size_t size, const SPatternCompareInfo *pInfo);
int32_t taosArrayCompareString(const void *a, const void *b);
diff --git a/include/util/tdef.h b/include/util/tdef.h
index 41a61ceb55..47fc619473 100644
--- a/include/util/tdef.h
+++ b/include/util/tdef.h
@@ -41,7 +41,7 @@ extern const int32_t TYPE_BYTES[15];
#define DOUBLE_BYTES sizeof(double)
#define POINTER_BYTES sizeof(void *) // 8 by default assert(sizeof(ptrdiff_t) == sizseof(void*)
#define TSDB_KEYSIZE sizeof(TSKEY)
-#define TSDB_NCHAR_SIZE sizeof(int32_t)
+#define TSDB_NCHAR_SIZE sizeof(TdUcs4)
// NULL definition
#define TSDB_DATA_BOOL_NULL 0x02
@@ -448,6 +448,11 @@ typedef struct {
#define SND_UNIQUE_THREAD_NUM 2
#define SND_SHARED_THREAD_NUM 2
+enum {
+ SND_WORKER_TYPE__SHARED = 1,
+ SND_WORKER_TYPE__UNIQUE,
+};
+
#ifdef __cplusplus
}
#endif
diff --git a/source/common/src/tvariant.c b/source/common/src/tvariant.c
index af6152d3f4..c6aa1cb81d 100644
--- a/source/common/src/tvariant.c
+++ b/source/common/src/tvariant.c
@@ -199,8 +199,8 @@ void taosVariantCreateFromBinary(SVariant *pVar, const char *pz, size_t len, uin
case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length
size_t lenInwchar = len / TSDB_NCHAR_SIZE;
- pVar->wpz = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE);
- memcpy(pVar->wpz, pz, lenInwchar * TSDB_NCHAR_SIZE);
+ pVar->ucs4 = calloc(1, (lenInwchar + 1) * TSDB_NCHAR_SIZE);
+ memcpy(pVar->ucs4, pz, lenInwchar * TSDB_NCHAR_SIZE);
pVar->nLen = (int32_t)len;
break;
@@ -343,7 +343,7 @@ int32_t taosVariantToString(SVariant *pVar, char *dst) {
case TSDB_DATA_TYPE_NCHAR: {
dst[0] = '\'';
- taosUcs4ToMbs(pVar->wpz, (twcslen(pVar->wpz) + 1) * TSDB_NCHAR_SIZE, dst + 1);
+ taosUcs4ToMbs(pVar->ucs4, (taosUcs4len(pVar->ucs4) + 1) * TSDB_NCHAR_SIZE, dst + 1);
int32_t len = (int32_t)strlen(dst);
dst[len] = '\'';
dst[len + 1] = 0;
@@ -384,7 +384,7 @@ static FORCE_INLINE int32_t convertToBoolImpl(char *pStr, int32_t len) {
}
}
-static FORCE_INLINE int32_t wcsconvertToBoolImpl(wchar_t *pstr, int32_t len) {
+static FORCE_INLINE int32_t wcsconvertToBoolImpl(TdUcs4 *pstr, int32_t len) {
if ((wcsncasecmp(pstr, L"true", len) == 0) && (len == 4)) {
return TSDB_TRUE;
} else if (wcsncasecmp(pstr, L"false", len) == 0 && (len == 5)) {
@@ -412,11 +412,11 @@ static int32_t toBinary(SVariant *pVariant, char **pDest, int32_t *pDestSize) {
pBuf = realloc(pBuf, newSize + 1);
}
- taosUcs4ToMbs(pVariant->wpz, (int32_t)newSize, pBuf);
- free(pVariant->wpz);
+ taosUcs4ToMbs(pVariant->ucs4, (int32_t)newSize, pBuf);
+ free(pVariant->ucs4);
pBuf[newSize] = 0;
} else {
- taosUcs4ToMbs(pVariant->wpz, (int32_t)newSize, *pDest);
+ taosUcs4ToMbs(pVariant->ucs4, (int32_t)newSize, *pDest);
}
} else {
@@ -460,8 +460,8 @@ static int32_t toNchar(SVariant *pVariant, char **pDest, int32_t *pDestSize) {
}
if (*pDest == pVariant->pz) {
- wchar_t *pWStr = calloc(1, (nLen + 1) * TSDB_NCHAR_SIZE);
- bool ret = taosMbsToUcs4(pDst, nLen, (char *)pWStr, (nLen + 1) * TSDB_NCHAR_SIZE, NULL);
+ TdUcs4 *pWStr = calloc(1, (nLen + 1) * TSDB_NCHAR_SIZE);
+ bool ret = taosMbsToUcs4(pDst, nLen, pWStr, (nLen + 1) * TSDB_NCHAR_SIZE, NULL);
if (!ret) {
tfree(pWStr);
return -1;
@@ -469,21 +469,21 @@ static int32_t toNchar(SVariant *pVariant, char **pDest, int32_t *pDestSize) {
// free the binary buffer in the first place
if (pVariant->nType == TSDB_DATA_TYPE_BINARY) {
- free(pVariant->wpz);
+ free(pVariant->ucs4);
}
- pVariant->wpz = pWStr;
- *pDestSize = twcslen(pVariant->wpz);
+ pVariant->ucs4 = pWStr;
+ *pDestSize = taosUcs4len(pVariant->ucs4);
// shrink the allocate memory, no need to check here.
- char *tmp = realloc(pVariant->wpz, (*pDestSize + 1) * TSDB_NCHAR_SIZE);
+ char *tmp = realloc(pVariant->ucs4, (*pDestSize + 1) * TSDB_NCHAR_SIZE);
assert(tmp != NULL);
- pVariant->wpz = (wchar_t *)tmp;
+ pVariant->ucs4 = (TdUcs4 *)tmp;
} else {
int32_t output = 0;
- bool ret = taosMbsToUcs4(pDst, nLen, *pDest, (nLen + 1) * TSDB_NCHAR_SIZE, &output);
+ bool ret = taosMbsToUcs4(pDst, nLen, (TdUcs4*)*pDest, (nLen + 1) * TSDB_NCHAR_SIZE, &output);
if (!ret) {
return -1;
}
@@ -554,7 +554,7 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result
*result = res;
} else if (pVariant->nType == TSDB_DATA_TYPE_NCHAR) {
errno = 0;
- wchar_t *endPtr = NULL;
+ TdUcs4 *endPtr = NULL;
SToken token = {0};
token.n = tGetToken(pVariant->pz, &token.type);
@@ -564,7 +564,7 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result
}
if (token.type == TK_FLOAT) {
- double v = wcstod(pVariant->wpz, &endPtr);
+ double v = wcstod(pVariant->ucs4, &endPtr);
if (releaseVariantPtr) {
free(pVariant->pz);
pVariant->nLen = 0;
@@ -583,7 +583,7 @@ static FORCE_INLINE int32_t convertToInteger(SVariant *pVariant, int64_t *result
setNull((char *)result, type, tDataTypes[type].bytes);
return 0;
} else {
- int64_t val = wcstoll(pVariant->wpz, &endPtr, 10);
+ int64_t val = wcstoll(pVariant->ucs4, &endPtr, 10);
if (releaseVariantPtr) {
free(pVariant->pz);
pVariant->nLen = 0;
@@ -649,7 +649,7 @@ static int32_t convertToBool(SVariant *pVariant, int64_t *pDest) {
*pDest = ret;
} else if (pVariant->nType == TSDB_DATA_TYPE_NCHAR) {
int32_t ret = 0;
- if ((ret = wcsconvertToBoolImpl(pVariant->wpz, pVariant->nLen)) < 0) {
+ if ((ret = wcsconvertToBoolImpl(pVariant->ucs4, pVariant->nLen)) < 0) {
return ret;
}
*pDest = ret;
@@ -899,7 +899,7 @@ int32_t tVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool inc
return -1;
}
} else {
- wcsncpy((wchar_t *)payload, pVariant->wpz, pVariant->nLen);
+ tasoUcs4Copy((TdUcs4*)payload, pVariant->ucs4, pVariant->nLen);
}
}
} else {
@@ -913,7 +913,7 @@ int32_t tVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool inc
return -1;
}
} else {
- memcpy(p, pVariant->wpz, pVariant->nLen);
+ memcpy(p, pVariant->ucs4, pVariant->nLen);
newlen = pVariant->nLen;
}
@@ -979,7 +979,7 @@ int32_t taosVariantTypeSetType(SVariant *pVariant, char type) {
pVariant->d = v;
} else if (pVariant->nType == TSDB_DATA_TYPE_NCHAR) {
errno = 0;
- double v = wcstod(pVariant->wpz, NULL);
+ double v = wcstod(pVariant->ucs4, NULL);
if ((errno == ERANGE && v == -1) || (isinf(v) || isnan(v))) {
free(pVariant->pz);
return -1;
diff --git a/source/dnode/mgmt/impl/inc/dndSnode.h b/source/dnode/mgmt/impl/inc/dndSnode.h
index b21e9191e8..f72d2a137a 100644
--- a/source/dnode/mgmt/impl/inc/dndSnode.h
+++ b/source/dnode/mgmt/impl/inc/dndSnode.h
@@ -24,12 +24,17 @@ extern "C" {
int32_t dndInitSnode(SDnode *pDnode);
void dndCleanupSnode(SDnode *pDnode);
-void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
+// void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
+void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
+void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
+void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
+void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
+
#ifdef __cplusplus
}
#endif
-#endif /*_TD_DND_SNODE_H_*/
\ No newline at end of file
+#endif /*_TD_DND_SNODE_H_*/
diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c
index 8667952f2c..ea06c8c751 100644
--- a/source/dnode/mgmt/impl/src/dndSnode.c
+++ b/source/dnode/mgmt/impl/src/dndSnode.c
@@ -382,6 +382,12 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) {
taosFreeQitem(pMsg);
}
+static FORCE_INLINE int32_t dndGetSWTypeFromMsg(SRpcMsg *pMsg) {
+ SStreamExecMsgHead *pHead = pMsg->pCont;
+ pHead->workerType = htonl(pHead->workerType);
+ return pHead->workerType;
+}
+
static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->streamTaskId = htonl(pHead->streamTaskId);
@@ -450,6 +456,18 @@ void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg);
}
+void dndProcessSnodeExecMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
+ SSnode *pSnode = dndAcquireSnode(pDnode);
+ if (pSnode != NULL) {
+ int32_t workerType = dndGetSWTypeFromMsg(pMsg);
+ if (workerType == SND_WORKER_TYPE__SHARED) {
+ dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg);
+ } else {
+ dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
+ }
+ }
+}
+
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
}
diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c
index 15db36477d..617b6c0fc3 100644
--- a/source/dnode/mgmt/impl/src/dndTransport.c
+++ b/source/dnode/mgmt/impl/src/dndTransport.c
@@ -23,10 +23,11 @@
#include "dndTransport.h"
#include "dndMgmt.h"
#include "dndMnode.h"
+#include "dndSnode.h"
#include "dndVnodes.h"
-#define INTERNAL_USER "_dnd"
-#define INTERNAL_CKEY "_key"
+#define INTERNAL_USER "_dnd"
+#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
static void dndInitMsgFp(STransMgmt *pMgmt) {
@@ -153,10 +154,14 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg;
+
+ // Requests handled by SNODE
+ pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_DEPLOY)] = dndProcessSnodeMgmtMsg;
+ pMgmt->msgFp[TMSG_INDEX(TDMT_SND_TASK_EXEC)] = dndProcessSnodeExecMsg;
}
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
- SDnode * pDnode = parent;
+ SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pRsp->msgType;
@@ -219,7 +224,7 @@ static void dndCleanupClient(SDnode *pDnode) {
}
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
- SDnode * pDnode = param;
+ SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pReq->msgType;
@@ -313,7 +318,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
- void * pReq = rpcMallocCont(contLen);
+ void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index 97c52f44eb..6b2857c411 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -3386,7 +3386,7 @@ void filterPrepare(void* expr, void* param) {
if (size < (uint32_t)pSchema->bytes) {
size = pSchema->bytes;
}
- // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(wchar_t) space.
+ // to make sure tonchar does not cause invalid write, since the '\0' needs at least sizeof(TdUcs4) space.
pInfo->q = calloc(1, size + TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE);
tVariantDump(pCond, pInfo->q, pSchema->type, true);
}
diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c
index 80a0b3554c..08e0ac55d7 100644
--- a/source/libs/catalog/src/catalog.c
+++ b/source/libs/catalog/src/catalog.c
@@ -2349,6 +2349,9 @@ _return:
CTG_API_LEAVE(code);
}
+int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) {
+
+}
int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
CTG_API_ENTER();
@@ -2394,6 +2397,9 @@ _return:
CTG_API_LEAVE(code);
}
+int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, const char *pIndexName, SIndexMeta** pIndexMeta) {
+
+}
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
CTG_API_ENTER();
@@ -2662,12 +2668,15 @@ _return:
int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
CTG_API_ENTER();
-
+
+ int32_t code = 0;
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
- //TODO
+ CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pRpc, pMgmtEps, &pQnodeList));
+
+_return:
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c
index 6db3abb9d6..a37820634f 100644
--- a/source/libs/parser/src/parInsert.c
+++ b/source/libs/parser/src/parInsert.c
@@ -622,7 +622,7 @@ static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* p
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
const char* rowEnd = tdRowEnd(rb->pBuf);
- if (!taosMbsToUcs4(value, len, (char*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
+ if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
varDataSetLen(rowEnd, output);
@@ -725,7 +725,7 @@ static int32_t KvRowAppend(const void *value, int32_t len, void *param) {
} else if (TSDB_DATA_TYPE_NCHAR == type) {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
- if (!taosMbsToUcs4(value, len, varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
+ if (!taosMbsToUcs4(value, len, (TdUcs4*)varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp
index 65d3f51dde..1f333e49e7 100644
--- a/source/libs/parser/test/mockCatalogService.cpp
+++ b/source/libs/parser/test/mockCatalogService.cpp
@@ -13,12 +13,11 @@
* along with this program. If not, see .
*/
-#include "mockCatalogService.h"
-
#include
#include
#include