Merge branch '3.0' of https://github.com/taosdata/TDengine into perf/row_iter_optimize
This commit is contained in:
commit
df69e060b2
|
@ -852,6 +852,7 @@ typedef struct {
|
||||||
int16_t hashSuffix;
|
int16_t hashSuffix;
|
||||||
int8_t hashMethod;
|
int8_t hashMethod;
|
||||||
SArray* pVgroupInfos; // Array of SVgroupInfo
|
SArray* pVgroupInfos; // Array of SVgroupInfo
|
||||||
|
int32_t errCode;
|
||||||
} SUseDbRsp;
|
} SUseDbRsp;
|
||||||
|
|
||||||
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
|
int32_t tSerializeSUseDbRsp(void* buf, int32_t bufLen, const SUseDbRsp* pRsp);
|
||||||
|
@ -3133,7 +3134,8 @@ int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
|
||||||
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
|
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t uid;
|
// int64_t uid;
|
||||||
|
char tbname[TSDB_TABLE_NAME_LEN];
|
||||||
int64_t ts;
|
int64_t ts;
|
||||||
} SSingleDeleteReq;
|
} SSingleDeleteReq;
|
||||||
|
|
||||||
|
|
|
@ -317,6 +317,8 @@ typedef struct SStreamTask {
|
||||||
int8_t inputStatus;
|
int8_t inputStatus;
|
||||||
int8_t outputStatus;
|
int8_t outputStatus;
|
||||||
|
|
||||||
|
// STaosQueue* inputQueue1;
|
||||||
|
// STaosQall* inputQall;
|
||||||
SStreamQueue* inputQueue;
|
SStreamQueue* inputQueue;
|
||||||
SStreamQueue* outputQueue;
|
SStreamQueue* outputQueue;
|
||||||
|
|
||||||
|
|
|
@ -154,7 +154,7 @@ int32_t* taosGetErrno();
|
||||||
|
|
||||||
// mnode-sdb
|
// mnode-sdb
|
||||||
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0320)
|
#define TSDB_CODE_SDB_OBJ_ALREADY_THERE TAOS_DEF_ERROR_CODE(0, 0x0320)
|
||||||
#define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0321)
|
#define TSDB_CODE_SDB_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0321) // internal
|
||||||
#define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0322)
|
#define TSDB_CODE_SDB_INVALID_TABLE_TYPE TAOS_DEF_ERROR_CODE(0, 0x0322)
|
||||||
#define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0323)
|
#define TSDB_CODE_SDB_OBJ_NOT_THERE TAOS_DEF_ERROR_CODE(0, 0x0323)
|
||||||
#define TSDB_CODE_SDB_INVALID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325)
|
#define TSDB_CODE_SDB_INVALID_KEY_TYPE TAOS_DEF_ERROR_CODE(0, 0x0325)
|
||||||
|
@ -218,14 +218,24 @@ int32_t* taosGetErrno();
|
||||||
|
|
||||||
// mnode-db
|
// mnode-db
|
||||||
#define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380)
|
#define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380)
|
||||||
#define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381)
|
#define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) //
|
||||||
#define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0382)
|
#define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0382) //
|
||||||
#define TSDB_CODE_MND_INVALID_DB TAOS_DEF_ERROR_CODE(0, 0x0383)
|
#define TSDB_CODE_MND_INVALID_DB TAOS_DEF_ERROR_CODE(0, 0x0383)
|
||||||
|
// #define TSDB_CODE_MND_MONITOR_DB_FORBIDDEN TAOS_DEF_ERROR_CODE(0, 0x0384) // 2.x
|
||||||
#define TSDB_CODE_MND_TOO_MANY_DATABASES TAOS_DEF_ERROR_CODE(0, 0x0385)
|
#define TSDB_CODE_MND_TOO_MANY_DATABASES TAOS_DEF_ERROR_CODE(0, 0x0385)
|
||||||
#define TSDB_CODE_MND_DB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0388)
|
#define TSDB_CODE_MND_DB_IN_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0386) //
|
||||||
#define TSDB_CODE_MND_INVALID_DB_ACCT TAOS_DEF_ERROR_CODE(0, 0x0389)
|
// #define TSDB_CODE_MND_VGROUP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0387) // 2.x
|
||||||
#define TSDB_CODE_MND_DB_OPTION_UNCHANGED TAOS_DEF_ERROR_CODE(0, 0x038A)
|
#define TSDB_CODE_MND_DB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0388) //
|
||||||
|
#define TSDB_CODE_MND_INVALID_DB_ACCT TAOS_DEF_ERROR_CODE(0, 0x0389) // internal
|
||||||
|
#define TSDB_CODE_MND_DB_OPTION_UNCHANGED TAOS_DEF_ERROR_CODE(0, 0x038A) //
|
||||||
#define TSDB_CODE_MND_DB_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x038B)
|
#define TSDB_CODE_MND_DB_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x038B)
|
||||||
|
// #define TSDB_CODE_MND_INVALID_DB_OPTION_DAYS TAOS_DEF_ERROR_CODE(0, 0x0390) // 2.x
|
||||||
|
// #define TSDB_CODE_MND_INVALID_DB_OPTION_KEEP TAOS_DEF_ERROR_CODE(0, 0x0391) // 2.x
|
||||||
|
// #define TSDB_CODE_MND_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x0392) // 2.x
|
||||||
|
// #define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x0393) // 2.x
|
||||||
|
// #define TSDB_CODE_MND_INVALID_TOPIC_PARTITONSTAOS_DEF_ERROR_CODE(0, 0x0394) // 2.x
|
||||||
|
// #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0395) // 2.x
|
||||||
|
#define TSDB_CODE_MND_DB_IN_CREATING TAOS_DEF_ERROR_CODE(0, 0x0396) //
|
||||||
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x039A)
|
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x039A)
|
||||||
|
|
||||||
// mnode-node
|
// mnode-node
|
||||||
|
|
|
@ -39,7 +39,7 @@ if not exist %work_dir%\debug\ver-%2-x86 (
|
||||||
md %work_dir%\debug\ver-%2-x86
|
md %work_dir%\debug\ver-%2-x86
|
||||||
)
|
)
|
||||||
cd %work_dir%\debug\ver-%2-x64
|
cd %work_dir%\debug\ver-%2-x64
|
||||||
call vcvarsall.bat x64
|
rem #call vcvarsall.bat x64
|
||||||
cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DWEBSOCKET=true -DBUILD_HTTP=false -DBUILD_TEST=false -DVERNUMBER=%2 -DCPUTYPE=x64
|
cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DWEBSOCKET=true -DBUILD_HTTP=false -DBUILD_TEST=false -DVERNUMBER=%2 -DCPUTYPE=x64
|
||||||
cmake --build .
|
cmake --build .
|
||||||
rd /s /Q C:\TDengine
|
rd /s /Q C:\TDengine
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
@echo off
|
@echo off
|
||||||
|
|
||||||
|
for /F %%a in ('echo prompt $E ^| cmd') do set "ESC=%%a"
|
||||||
|
|
||||||
goto %1
|
goto %1
|
||||||
:needAdmin
|
:needAdmin
|
||||||
|
|
||||||
|
@ -11,60 +14,94 @@ set binary_dir=%3
|
||||||
set binary_dir=%binary_dir:/=\\%
|
set binary_dir=%binary_dir:/=\\%
|
||||||
set osType=%4
|
set osType=%4
|
||||||
set verNumber=%5
|
set verNumber=%5
|
||||||
set tagert_dir=C:\\TDengine
|
set target_dir=C:\\TDengine
|
||||||
|
|
||||||
if not exist %tagert_dir% (
|
if not exist %target_dir% (
|
||||||
mkdir %tagert_dir%
|
mkdir %target_dir%
|
||||||
)
|
)
|
||||||
if not exist %tagert_dir%\\cfg (
|
if not exist %target_dir%\\cfg (
|
||||||
mkdir %tagert_dir%\\cfg
|
mkdir %target_dir%\\cfg
|
||||||
)
|
)
|
||||||
if not exist %tagert_dir%\\include (
|
if not exist %target_dir%\\include (
|
||||||
mkdir %tagert_dir%\\include
|
mkdir %target_dir%\\include
|
||||||
)
|
)
|
||||||
if not exist %tagert_dir%\\driver (
|
if not exist %target_dir%\\driver (
|
||||||
mkdir %tagert_dir%\\driver
|
mkdir %target_dir%\\driver
|
||||||
)
|
)
|
||||||
if not exist C:\\TDengine\\cfg\\taos.cfg (
|
if not exist C:\\TDengine\\cfg\\taos.cfg (
|
||||||
copy %source_dir%\\packaging\\cfg\\taos.cfg %tagert_dir%\\cfg\\taos.cfg > nul
|
copy %source_dir%\\packaging\\cfg\\taos.cfg %target_dir%\\cfg\\taos.cfg > nul
|
||||||
)
|
)
|
||||||
|
|
||||||
if exist %binary_dir%\\test\\cfg\\taosadapter.toml (
|
if exist %binary_dir%\\test\\cfg\\taosadapter.toml (
|
||||||
if not exist %tagert_dir%\\cfg\\taosadapter.toml (
|
if not exist %target_dir%\\cfg\\taosadapter.toml (
|
||||||
copy %binary_dir%\\test\\cfg\\taosadapter.toml %tagert_dir%\\cfg\\taosadapter.toml > nul
|
copy %binary_dir%\\test\\cfg\\taosadapter.toml %target_dir%\\cfg\\taosadapter.toml > nul
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
copy %source_dir%\\include\\client\\taos.h %tagert_dir%\\include > nul
|
copy %source_dir%\\include\\client\\taos.h %target_dir%\\include > nul
|
||||||
copy %source_dir%\\include\\util\\taoserror.h %tagert_dir%\\include > nul
|
copy %source_dir%\\include\\util\\taoserror.h %target_dir%\\include > nul
|
||||||
copy %source_dir%\\include\\libs\\function\\taosudf.h %tagert_dir%\\include > nul
|
copy %source_dir%\\include\\libs\\function\\taosudf.h %target_dir%\\include > nul
|
||||||
copy %binary_dir%\\build\\lib\\taos.lib %tagert_dir%\\driver > nul
|
copy %binary_dir%\\build\\lib\\taos.lib %target_dir%\\driver > nul
|
||||||
copy %binary_dir%\\build\\lib\\taos_static.lib %tagert_dir%\\driver > nul
|
copy %binary_dir%\\build\\lib\\taos_static.lib %target_dir%\\driver > nul
|
||||||
copy %binary_dir%\\build\\lib\\taos.dll %tagert_dir%\\driver > nul
|
copy %binary_dir%\\build\\lib\\taos.dll %target_dir%\\driver > nul
|
||||||
copy %binary_dir%\\build\\bin\\taos.exe %tagert_dir% > nul
|
copy %binary_dir%\\build\\bin\\taos.exe %target_dir% > nul
|
||||||
copy %binary_dir%\\build\\bin\\taosd.exe %tagert_dir% > nul
|
|
||||||
copy %binary_dir%\\build\\bin\\udfd.exe %tagert_dir% > nul
|
|
||||||
if exist %binary_dir%\\build\\bin\\taosBenchmark.exe (
|
if exist %binary_dir%\\build\\bin\\taosBenchmark.exe (
|
||||||
copy %binary_dir%\\build\\bin\\taosBenchmark.exe %tagert_dir% > nul
|
copy %binary_dir%\\build\\bin\\taosBenchmark.exe %target_dir% > nul
|
||||||
)
|
)
|
||||||
if exist %binary_dir%\\build\\lib\\taosws.dll.lib (
|
if exist %binary_dir%\\build\\lib\\taosws.dll.lib (
|
||||||
copy %binary_dir%\\build\\lib\\taosws.dll.lib %tagert_dir%\\driver > nul
|
copy %binary_dir%\\build\\lib\\taosws.dll.lib %target_dir%\\driver > nul
|
||||||
)
|
)
|
||||||
if exist %binary_dir%\\build\\lib\\taosws.dll (
|
if exist %binary_dir%\\build\\lib\\taosws.dll (
|
||||||
copy %binary_dir%\\build\\lib\\taosws.dll %tagert_dir%\\driver > nul
|
copy %binary_dir%\\build\\lib\\taosws.dll %target_dir%\\driver > nul
|
||||||
copy %source_dir%\\tools\\taosws-rs\\target\\release\\taosws.h %tagert_dir%\\include > nul
|
copy %source_dir%\\tools\\taosws-rs\\target\\release\\taosws.h %target_dir%\\include > nul
|
||||||
)
|
)
|
||||||
if exist %binary_dir%\\build\\bin\\taosdump.exe (
|
if exist %binary_dir%\\build\\bin\\taosdump.exe (
|
||||||
copy %binary_dir%\\build\\bin\\taosdump.exe %tagert_dir% > nul
|
copy %binary_dir%\\build\\bin\\taosdump.exe %target_dir% > nul
|
||||||
)
|
|
||||||
if exist %binary_dir%\\build\\bin\\taosadapter.exe (
|
|
||||||
copy %binary_dir%\\build\\bin\\taosadapter.exe %tagert_dir% > nul
|
|
||||||
)
|
)
|
||||||
|
|
||||||
mshta vbscript:createobject("shell.application").shellexecute("%~s0",":hasAdmin","","runas",1)(window.close)&& echo To start/stop TDengine with administrator privileges: sc start/stop taosd &goto :eof
|
copy %binary_dir%\\build\\bin\\taosd.exe %target_dir% > nul
|
||||||
|
copy %binary_dir%\\build\\bin\\udfd.exe %target_dir% > nul
|
||||||
|
|
||||||
|
if exist %binary_dir%\\build\\bin\\taosadapter.exe (
|
||||||
|
copy %binary_dir%\\build\\bin\\taosadapter.exe %target_dir% > nul
|
||||||
|
)
|
||||||
|
|
||||||
|
mshta vbscript:createobject("shell.application").shellexecute("%~s0",":hasAdmin","","runas",1)(window.close)
|
||||||
|
|
||||||
|
echo.
|
||||||
|
echo Please manually remove C:\TDengine from your system PATH environment after you remove TDengine software
|
||||||
|
echo.
|
||||||
|
echo To start/stop TDengine with administrator privileges: %ESC%[92msc start/stop taosd %ESC%[0m
|
||||||
|
|
||||||
|
if exist %binary_dir%\\build\\bin\\taosadapter.exe (
|
||||||
|
echo To start/stop taosAdapter with administrator privileges: %ESC%[92msc start/stop taosadapter %ESC%[0m
|
||||||
|
)
|
||||||
|
|
||||||
|
goto :eof
|
||||||
|
|
||||||
:hasAdmin
|
:hasAdmin
|
||||||
|
|
||||||
|
sc query "taosd" && sc stop taosd && sc delete taosd
|
||||||
|
sc query "taosadapter" && sc stop taosadapter && sc delete taosd
|
||||||
|
|
||||||
copy /y C:\\TDengine\\driver\\taos.dll C:\\Windows\\System32 > nul
|
copy /y C:\\TDengine\\driver\\taos.dll C:\\Windows\\System32 > nul
|
||||||
if exist C:\\TDengine\\driver\\taosws.dll (
|
if exist C:\\TDengine\\driver\\taosws.dll (
|
||||||
copy /y C:\\TDengine\\driver\\taosws.dll C:\\Windows\\System32 > nul
|
copy /y C:\\TDengine\\driver\\taosws.dll C:\\Windows\\System32 > nul
|
||||||
)
|
)
|
||||||
|
|
||||||
sc query "taosd" >nul || sc create "taosd" binPath= "C:\\TDengine\\taosd.exe --win_service" start= DEMAND
|
sc query "taosd" >nul || sc create "taosd" binPath= "C:\\TDengine\\taosd.exe --win_service" start= DEMAND
|
||||||
sc query "taosadapter" >nul || sc create "taosadapter" binPath= "C:\\TDengine\\taosadapter.exe" start= DEMAND
|
sc query "taosadapter" >nul || sc create "taosadapter" binPath= "C:\\TDengine\\taosadapter.exe" start= DEMAND
|
||||||
|
|
||||||
|
set "env=HKLM\System\CurrentControlSet\Control\Session Manager\Environment"
|
||||||
|
for /f "tokens=2*" %%I in ('reg query "%env%" /v Path ^| findstr /i "\<Path\>"') do (
|
||||||
|
|
||||||
|
rem // make addition persistent through reboots
|
||||||
|
reg add "%env%" /f /v Path /t REG_EXPAND_SZ /d "%%J;C:\TDengine"
|
||||||
|
|
||||||
|
rem // apply change to the current process
|
||||||
|
for %%a in ("%%J;C:\TDengine") do path %%~a
|
||||||
|
)
|
||||||
|
|
||||||
|
rem // use setx to set a temporary throwaway value to trigger a WM_SETTINGCHANGE
|
||||||
|
rem // applies change to new console windows without requiring a reboot
|
||||||
|
(setx /m foo bar & reg delete "%env%" /f /v foo) >NUL 2>NUL
|
||||||
|
|
||||||
|
|
|
@ -60,6 +60,29 @@ Source: {#MyAppSourceDir}{#MyAppIncludeName}; DestDir: "{app}\include"; Flags: i
|
||||||
Source: {#MyAppSourceDir}{#MyAppExeName}; DestDir: "{app}"; Excludes: {#MyAppExcludeSource} ; Flags: igNoreversion recursesubdirs createallsubdirs
|
Source: {#MyAppSourceDir}{#MyAppExeName}; DestDir: "{app}"; Excludes: {#MyAppExcludeSource} ; Flags: igNoreversion recursesubdirs createallsubdirs
|
||||||
Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs
|
Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs
|
||||||
|
|
||||||
|
|
||||||
|
[Registry]
|
||||||
|
Root: HKLM; Subkey: "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"; \
|
||||||
|
ValueType: expandsz; ValueName: "Path"; ValueData: "{olddata};C:\TDengine"; \
|
||||||
|
Check: NeedsAddPath('C:\TDengine')
|
||||||
|
|
||||||
|
[Code]
|
||||||
|
function NeedsAddPath(Param: string): boolean;
|
||||||
|
var
|
||||||
|
OrigPath: string;
|
||||||
|
begin
|
||||||
|
if not RegQueryStringValue(HKEY_LOCAL_MACHINE,
|
||||||
|
'SYSTEM\CurrentControlSet\Control\Session Manager\Environment',
|
||||||
|
'Path', OrigPath)
|
||||||
|
then begin
|
||||||
|
Result := True;
|
||||||
|
exit;
|
||||||
|
end;
|
||||||
|
{ look for the path with leading and trailing semicolon }
|
||||||
|
{ Pos() returns 0 if not found }
|
||||||
|
Result := Pos(';' + Param + ';', ';' + OrigPath + ';') = 0;
|
||||||
|
end;
|
||||||
|
|
||||||
[UninstallDelete]
|
[UninstallDelete]
|
||||||
Name: {app}\driver; Type: filesandordirs
|
Name: {app}\driver; Type: filesandordirs
|
||||||
Name: {app}\connector; Type: filesandordirs
|
Name: {app}\connector; Type: filesandordirs
|
||||||
|
|
|
@ -175,7 +175,8 @@ int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
SRequestObj* pRequest = param;
|
SRequestObj* pRequest = param;
|
||||||
|
|
||||||
if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
|
if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
|
||||||
|
TSDB_CODE_MND_DB_IN_DROPPING == code) {
|
||||||
SUseDbRsp usedbRsp = {0};
|
SUseDbRsp usedbRsp = {0};
|
||||||
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
|
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
|
||||||
struct SCatalog* pCatalog = NULL;
|
struct SCatalog* pCatalog = NULL;
|
||||||
|
@ -212,7 +213,11 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
|
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
|
||||||
|
|
||||||
if (strlen(usedbRsp.db) == 0) {
|
if (strlen(usedbRsp.db) == 0) {
|
||||||
return TSDB_CODE_MND_DB_NOT_EXIST;
|
if (usedbRsp.errCode != 0) {
|
||||||
|
return usedbRsp.errCode;
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
|
|
|
@ -1649,7 +1649,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
|
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
|
pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
|
@ -1667,7 +1667,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
return pRsp;
|
return pRsp;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
|
tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
|
||||||
pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2489,6 +2489,7 @@ int32_t tSerializeSUseDbRspImp(SEncoder *pEncoder, const SUseDbRsp *pRsp) {
|
||||||
if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1;
|
if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tEncodeI32(pEncoder, pRsp->errCode) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2553,6 +2554,7 @@ int32_t tDeserializeSUseDbRspImp(SDecoder *pDecoder, SUseDbRsp *pRsp) {
|
||||||
taosArrayPush(pRsp->pVgroupInfos, &vgInfo);
|
taosArrayPush(pRsp->pVgroupInfos, &vgInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tDecodeI32(pDecoder, &pRsp->errCode) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6129,13 +6131,13 @@ void tDeleteSTaosxRsp(STaosxRsp *pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
|
int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) {
|
||||||
if (tEncodeI64(pEncoder, pReq->uid) < 0) return -1;
|
if (tEncodeCStr(pEncoder, pReq->tbname) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) {
|
int32_t tDecodeSSingleDeleteReq(SDecoder *pDecoder, SSingleDeleteReq *pReq) {
|
||||||
if (tDecodeI64(pDecoder, &pReq->uid) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pReq->tbname) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,8 +285,17 @@ static inline int32_t mndGetGlobalVgroupVersion(SMnode *pMnode) {
|
||||||
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db) {
|
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SDbObj *pDb = sdbAcquire(pSdb, SDB_DB, db);
|
SDbObj *pDb = sdbAcquire(pSdb, SDB_DB, db);
|
||||||
if (pDb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
if (pDb == NULL) {
|
||||||
|
if (terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||||
|
} else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
|
||||||
|
terrno = TSDB_CODE_MND_DB_IN_CREATING;
|
||||||
|
} else if (terrno == TSDB_CODE_SDB_OBJ_DROPPING) {
|
||||||
|
terrno = TSDB_CODE_MND_DB_IN_DROPPING;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_APP_ERROR;
|
||||||
|
mFatal("db:%s, failed to acquire db since %s", db, terrstr());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return pDb;
|
return pDb;
|
||||||
}
|
}
|
||||||
|
@ -594,7 +603,8 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
|
||||||
terrno = TSDB_CODE_MND_DB_ALREADY_EXIST;
|
terrno = TSDB_CODE_MND_DB_ALREADY_EXIST;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
} else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
|
} else {
|
||||||
|
if (terrno == TSDB_CODE_MND_DB_IN_CREATING) {
|
||||||
if (mndSetRpcInfoForDbTrans(pMnode, pReq, MND_OPER_CREATE_DB, createReq.db) == 0) {
|
if (mndSetRpcInfoForDbTrans(pMnode, pReq, MND_OPER_CREATE_DB, createReq.db) == 0) {
|
||||||
mInfo("db:%s, is creating and response after trans finished", createReq.db);
|
mInfo("db:%s, is creating and response after trans finished", createReq.db);
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
@ -602,8 +612,13 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
|
||||||
} else {
|
} else {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
} else if (terrno != TSDB_CODE_MND_DB_NOT_EXIST) {
|
} else if (terrno == TSDB_CODE_MND_DB_IN_DROPPING) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
} else if (terrno == TSDB_CODE_MND_DB_NOT_EXIST) {
|
||||||
|
// continue
|
||||||
|
} else { // TSDB_CODE_APP_ERROR
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
|
||||||
|
@ -786,7 +801,6 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
pDb = mndAcquireDb(pMnode, alterReq.db);
|
pDb = mndAcquireDb(pMnode, alterReq.db);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -836,7 +850,6 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
pDb = mndAcquireDb(pMnode, cfgReq.db);
|
pDb = mndAcquireDb(pMnode, cfgReq.db);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1066,11 +1079,8 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
if (dropReq.ignoreNotExists) {
|
if (dropReq.ignoreNotExists) {
|
||||||
code = mndBuildDropDbRsp(pDb, &pReq->info.rspLen, &pReq->info.rsp, true);
|
code = mndBuildDropDbRsp(pDb, &pReq->info.rspLen, &pReq->info.rsp, true);
|
||||||
goto _OVER;
|
|
||||||
} else {
|
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_DB, pDb) != 0) {
|
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_DB, pDb) != 0) {
|
||||||
|
@ -1197,10 +1207,7 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
|
||||||
int32_t vgVersion = mndGetGlobalVgroupVersion(pMnode);
|
int32_t vgVersion = mndGetGlobalVgroupVersion(pMnode);
|
||||||
if (usedbReq.vgVersion < vgVersion) {
|
if (usedbReq.vgVersion < vgVersion) {
|
||||||
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
|
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
|
||||||
if (usedbRsp.pVgroupInfos == NULL) {
|
if (usedbRsp.pVgroupInfos == NULL) goto _OVER;
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
|
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
|
||||||
usedbRsp.vgVersion = vgVersion++;
|
usedbRsp.vgVersion = vgVersion++;
|
||||||
|
@ -1209,16 +1216,13 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
|
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
// no jump, need to construct rsp
|
|
||||||
} else {
|
} else {
|
||||||
pDb = mndAcquireDb(pMnode, usedbReq.db);
|
pDb = mndAcquireDb(pMnode, usedbReq.db);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
|
||||||
|
|
||||||
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
|
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
|
||||||
usedbRsp.uid = usedbReq.dbId;
|
usedbRsp.uid = usedbReq.dbId;
|
||||||
usedbRsp.vgVersion = usedbReq.vgVersion;
|
usedbRsp.vgVersion = usedbReq.vgVersion;
|
||||||
|
usedbRsp.errCode = terrno;
|
||||||
|
|
||||||
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
|
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -287,9 +287,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
|
memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN);
|
||||||
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
|
SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB);
|
||||||
if (pSourceDb == NULL) {
|
if (pSourceDb == NULL) {
|
||||||
/*ASSERT(0);*/
|
mInfo("stream:%s failed to create, source db %s not exist since %s", pCreate->name, pObj->sourceDb, terrstr());
|
||||||
mInfo("stream:%s failed to create, source db %s not exist", pCreate->name, pObj->sourceDb);
|
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pObj->sourceDbUid = pSourceDb->uid;
|
pObj->sourceDbUid = pSourceDb->uid;
|
||||||
|
@ -298,8 +296,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
||||||
|
|
||||||
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
|
SDbObj *pTargetDb = mndAcquireDbByStb(pMnode, pObj->targetSTbName);
|
||||||
if (pTargetDb == NULL) {
|
if (pTargetDb == NULL) {
|
||||||
mInfo("stream:%s failed to create, target db %s not exist", pCreate->name, pObj->targetDb);
|
mInfo("stream:%s failed to create, target db %s not exist since %s", pCreate->name, pObj->targetDb, terrstr());
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(pObj->targetDb, pTargetDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
|
|
@ -25,6 +25,8 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
||||||
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||||
|
|
||||||
|
tqDebug("stream delete msg: row %d", totRow);
|
||||||
|
|
||||||
for (int32_t row = 0; row < totRow; row++) {
|
for (int32_t row = 0; row < totRow; row++) {
|
||||||
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
|
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
|
||||||
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
|
int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
|
||||||
|
@ -36,11 +38,14 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
||||||
} else {
|
} else {
|
||||||
name = buildCtbNameByGroupId(stbFullName, groupId);
|
name = buildCtbNameByGroupId(stbFullName, groupId);
|
||||||
}
|
}
|
||||||
tqDebug("stream delete msg: groupId :%" PRId64 ", name: %s", groupId, name);
|
tqDebug("stream delete msg: vgId:%d, groupId :%" PRId64 ", name: %s, ts:%" PRId64, pVnode->config.vgId, groupId,
|
||||||
|
name, ts);
|
||||||
|
#if 0
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||||
if (metaGetTableEntryByName(&mr, name) < 0) {
|
if (metaGetTableEntryByName(&mr, name) < 0) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
tqDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name);
|
||||||
taosMemoryFree(name);
|
taosMemoryFree(name);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -48,10 +53,13 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
|
||||||
int64_t uid = mr.me.uid;
|
int64_t uid = mr.me.uid;
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
taosMemoryFree(name);
|
taosMemoryFree(name);
|
||||||
|
#endif
|
||||||
SSingleDeleteReq req = {
|
SSingleDeleteReq req = {
|
||||||
.ts = ts,
|
.ts = ts,
|
||||||
.uid = uid,
|
|
||||||
};
|
};
|
||||||
|
strncpy(req.tbname, name, TSDB_TABLE_NAME_LEN);
|
||||||
|
taosMemoryFree(name);
|
||||||
|
/*tqDebug("stream delete msg, active: vgId:%d, ts:%" PRId64 " name:%s", pVnode->config.vgId, ts, name);*/
|
||||||
taosArrayPush(deleteReq->deleteReqs, &req);
|
taosArrayPush(deleteReq->deleteReqs, &req);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -309,6 +317,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
||||||
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
|
||||||
deleteReq.suid = suid;
|
deleteReq.suid = suid;
|
||||||
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq);
|
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq);
|
||||||
|
if (taosArrayGetSize(deleteReq.deleteReqs) == 0) {
|
||||||
|
taosArrayDestroy(deleteReq.deleteReqs);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
|
@ -117,8 +117,8 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
|
||||||
metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info);
|
metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info);
|
||||||
}
|
}
|
||||||
if (pMsgIter->sversion != info.skmVer) {
|
if (pMsgIter->sversion != info.skmVer) {
|
||||||
tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64,
|
tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64, TD_VID(pTsdb->pVnode),
|
||||||
TD_VID(pTsdb->pVnode), pMsgIter->sversion, info.skmVer, suid, uid);
|
pMsgIter->sversion, info.skmVer, suid, uid);
|
||||||
code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
|
code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -198,14 +198,14 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
tsdbInfo("vgId:%d, delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||||
" since %s",
|
" at version %" PRId64 " since %s",
|
||||||
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
tsdbError("vgId:%d, failed to delete data from table suid:%" PRId64 " uid:%" PRId64 " skey:%" PRId64 " eKey:%" PRId64
|
||||||
" since %s",
|
" at version %" PRId64 " since %s",
|
||||||
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, tstrerror(code));
|
TD_VID(pTsdb->pVnode), suid, uid, sKey, eKey, version, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1169,16 +1169,28 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void
|
||||||
tDecoderInit(&decoder, pReq, len);
|
tDecoderInit(&decoder, pReq, len);
|
||||||
tDecodeSBatchDeleteReq(&decoder, &deleteReq);
|
tDecodeSBatchDeleteReq(&decoder, &deleteReq);
|
||||||
|
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
|
int32_t sz = taosArrayGetSize(deleteReq.deleteReqs);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
|
SSingleDeleteReq *pOneReq = taosArrayGet(deleteReq.deleteReqs, i);
|
||||||
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts);
|
char *name = pOneReq->tbname;
|
||||||
|
if (metaGetTableEntryByName(&mr, name) < 0) {
|
||||||
|
vDebug("stream delete msg, skip vgId:%d since no table: %s", pVnode->config.vgId, name);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t uid = mr.me.uid;
|
||||||
|
|
||||||
|
int32_t code = tsdbDeleteTableData(pVnode->pTsdb, version, deleteReq.suid, uid, pOneReq->ts, pOneReq->ts);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
|
vError("vgId:%d, delete error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64 ", end ts:%" PRId64,
|
||||||
TD_VID(pVnode), terrstr(), deleteReq.suid, pOneReq->uid, pOneReq->ts, pOneReq->ts);
|
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->ts, pOneReq->ts);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
metaReaderClear(&mr);
|
||||||
taosArrayDestroy(deleteReq.deleteReqs);
|
taosArrayDestroy(deleteReq.deleteReqs);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -538,7 +538,8 @@ typedef struct SCtgOperation {
|
||||||
(sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
|
(sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
|
||||||
|
|
||||||
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
|
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST)
|
||||||
#define CTG_DB_NOT_EXIST(code) (code == TSDB_CODE_MND_DB_NOT_EXIST)
|
#define CTG_DB_NOT_EXIST(code) \
|
||||||
|
(code == TSDB_CODE_MND_DB_NOT_EXIST || code == TSDB_CODE_MND_DB_IN_CREATING || code == TSDB_CODE_MND_DB_IN_DROPPING)
|
||||||
|
|
||||||
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__)
|
#define ctgFatal(param, ...) qFatal("CTG:%p " param, pCtg, __VA_ARGS__)
|
||||||
#define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__)
|
#define ctgError(param, ...) qError("CTG:%p " param, pCtg, __VA_ARGS__)
|
||||||
|
|
|
@ -184,7 +184,6 @@ enum {
|
||||||
typedef struct SOperatorFpSet {
|
typedef struct SOperatorFpSet {
|
||||||
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
|
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
|
||||||
__optr_fn_t getNextFn;
|
__optr_fn_t getNextFn;
|
||||||
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it
|
|
||||||
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
|
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
|
||||||
__optr_close_fn_t closeFn;
|
__optr_close_fn_t closeFn;
|
||||||
__optr_encode_fn_t encodeResultRow;
|
__optr_encode_fn_t encodeResultRow;
|
||||||
|
|
|
@ -114,7 +114,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
|
||||||
SOperatorFpSet fpSet = {
|
SOperatorFpSet fpSet = {
|
||||||
._openFn = openFn,
|
._openFn = openFn,
|
||||||
.getNextFn = nextFn,
|
.getNextFn = nextFn,
|
||||||
.getStreamResFn = streamFn,
|
|
||||||
.cleanupFn = cleanup,
|
.cleanupFn = cleanup,
|
||||||
.closeFn = closeFn,
|
.closeFn = closeFn,
|
||||||
.getExplainFn = explain,
|
.getExplainFn = explain,
|
||||||
|
|
|
@ -1691,14 +1691,14 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
|
||||||
|
|
||||||
pInfo->srcRowIndex = 0;
|
pInfo->srcRowIndex = 0;
|
||||||
|
|
||||||
pOperator->name = "FillOperator";
|
pOperator->name = "StreamFillOperator";
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo,
|
pOperator->fpSet =
|
||||||
NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -12,8 +12,8 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#include "filter.h"
|
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
|
@ -1255,9 +1255,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SSDataBlock* pBlock = pInfo->binfo.pRes;
|
SSDataBlock* pBlock = pInfo->binfo.pRes;
|
||||||
|
|
||||||
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
ASSERT(pInfo->execModel == OPTR_EXEC_MODEL_BATCH);
|
||||||
return pOperator->fpSet.getStreamResFn(pOperator);
|
|
||||||
} else {
|
|
||||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1284,7 +1283,6 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
return (rows == 0) ? NULL : pBlock;
|
return (rows == 0) ? NULL : pBlock;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
|
static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type) {
|
||||||
for (int i = 0; i < num; i++) {
|
for (int i = 0; i < num; i++) {
|
||||||
|
|
|
@ -2181,7 +2181,8 @@ static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTarge
|
||||||
|
|
||||||
if (TSDB_DB_NAME_T == pTargetName->type) {
|
if (TSDB_DB_NAME_T == pTargetName->type) {
|
||||||
int32_t code = getDBVgInfoImpl(pCxt, pTargetName, pVgroupList);
|
int32_t code = getDBVgInfoImpl(pCxt, pTargetName, pVgroupList);
|
||||||
if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
|
if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
|
||||||
|
TSDB_CODE_MND_DB_IN_DROPPING == code) {
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
@ -2196,7 +2197,8 @@ static int32_t getTagsTableVgroupListImpl(STranslateContext* pCxt, SName* pTarge
|
||||||
} else {
|
} else {
|
||||||
taosArrayPush(*pVgroupList, &vgInfo);
|
taosArrayPush(*pVgroupList, &vgInfo);
|
||||||
}
|
}
|
||||||
} else if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
|
} else if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
|
||||||
|
TSDB_CODE_MND_DB_IN_DROPPING == code) {
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -32,8 +32,6 @@ typedef struct {
|
||||||
|
|
||||||
static SStreamGlobalEnv streamEnv;
|
static SStreamGlobalEnv streamEnv;
|
||||||
|
|
||||||
// int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch);
|
|
||||||
|
|
||||||
int32_t streamDispatch(SStreamTask* pTask);
|
int32_t streamDispatch(SStreamTask* pTask);
|
||||||
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData);
|
||||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
||||||
|
|
|
@ -240,43 +240,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
|
|
||||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp));
|
|
||||||
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
|
|
||||||
|
|
||||||
SStreamTaskRecoverRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
|
||||||
pCont->inputStatus = pTask->inputStatus;
|
|
||||||
pCont->streamId = pTask->streamId;
|
|
||||||
pCont->reqTaskId = pTask->taskId;
|
|
||||||
pCont->rspTaskId = pReq->upstreamTaskId;
|
|
||||||
|
|
||||||
pRsp->pCont = buf;
|
|
||||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp);
|
|
||||||
tmsgSendRsp(pRsp);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamProcessRecoverRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamRecoverDownstreamRsp* pRsp) {
|
|
||||||
streamProcessRunReq(pTask);
|
|
||||||
|
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
|
||||||
// scan data to recover
|
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__RECOVER;
|
|
||||||
pTask->taskStatus = TASK_STATUS__RECOVER_SELF;
|
|
||||||
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
|
|
||||||
if (streamPipelineExec(pTask, 100, true) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
|
||||||
pTask->taskStatus = TASK_STATUS__NORMAL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||||
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
|
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
|
||||||
|
|
||||||
|
|
|
@ -138,63 +138,39 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) {
|
int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) {
|
||||||
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
|
// fetch all queue item, merge according to batchLimit
|
||||||
|
int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall);
|
||||||
void* exec = pTask->exec.executor;
|
if (numOfItems == 0) {
|
||||||
|
qDebug("task: %d, stream task exec over, queue empty", pTask->taskId);
|
||||||
while (1) {
|
return 0;
|
||||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
|
||||||
if (pRes == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t batchCnt = 0;
|
|
||||||
while (1) {
|
|
||||||
SSDataBlock* output = NULL;
|
|
||||||
uint64_t ts = 0;
|
|
||||||
if (qExecTask(exec, &output, &ts) < 0) {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
if (output == NULL) break;
|
|
||||||
|
|
||||||
SSDataBlock block = {0};
|
|
||||||
assignOneDataBlock(&block, output);
|
|
||||||
block.info.childId = pTask->selfChildId;
|
|
||||||
taosArrayPush(pRes, &block);
|
|
||||||
|
|
||||||
if (++batchCnt >= batchNum) break;
|
|
||||||
}
|
|
||||||
if (taosArrayGetSize(pRes) == 0) {
|
|
||||||
taosArrayDestroy(pRes);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (dispatch) {
|
|
||||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
|
||||||
if (qRes == NULL) {
|
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
|
||||||
qRes->blocks = pRes;
|
|
||||||
qRes->childId = pTask->selfChildId;
|
|
||||||
|
|
||||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
|
||||||
taosFreeQitem(qRes);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
streamDispatch(pTask);
|
|
||||||
}
|
}
|
||||||
|
SStreamQueueItem* pMerged = NULL;
|
||||||
|
SStreamQueueItem* pItem = NULL;
|
||||||
|
taosGetQitem(pTask->inputQall, (void**)&pItem);
|
||||||
|
if (pItem == NULL) {
|
||||||
|
if (pMerged != NULL) {
|
||||||
|
// process merged item
|
||||||
} else {
|
} else {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if drop
|
||||||
|
if (pItem->type == STREAM_INPUT__DESTROY) {
|
||||||
|
// set status drop
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||||
|
ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK);
|
||||||
|
streamTaskOutput(pTask, (SStreamDataBlock*)pItem);
|
||||||
|
}
|
||||||
|
|
||||||
|
// exec impl
|
||||||
|
|
||||||
|
// output
|
||||||
|
// try dispatch
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -482,7 +482,8 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
|
||||||
|
|
||||||
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
|
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
|
||||||
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
|
pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
|
||||||
wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s", pWal->cfg.vgId, index, TMSG_INFO(msgType));
|
wDebug("vgId:%d, wal write log %" PRId64 ", msgType: %s, cksum head %u cksum body %u", pWal->cfg.vgId, index,
|
||||||
|
TMSG_INFO(msgType), pWal->writeHead.cksumHead, pWal->writeHead.cksumBody);
|
||||||
|
|
||||||
code = walWriteIndex(pWal, index, offset);
|
code = walWriteIndex(pWal, index, offset);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
|
|
@ -26,12 +26,14 @@ SArray* taosArrayInit(size_t size, size_t elemSize) {
|
||||||
|
|
||||||
SArray* pArray = taosMemoryMalloc(sizeof(SArray));
|
SArray* pArray = taosMemoryMalloc(sizeof(SArray));
|
||||||
if (pArray == NULL) {
|
if (pArray == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pArray->size = 0;
|
pArray->size = 0;
|
||||||
pArray->pData = taosMemoryCalloc(size, elemSize);
|
pArray->pData = taosMemoryCalloc(size, elemSize);
|
||||||
if (pArray->pData == NULL) {
|
if (pArray->pData == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosMemoryFree(pArray);
|
taosMemoryFree(pArray);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -225,11 +225,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ALREADY_EXIST, "Database already exis
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_DATABASES, "Too many databases for account")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_DATABASES, "Too many databases for account")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_DROPPING, "Database in dropping status")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_EXIST, "Database not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_EXIST, "Database not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_ACCT, "Invalid database account")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_ACCT, "Invalid database account")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_OPTION_UNCHANGED, "Database options not changed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_OPTION_UNCHANGED, "Database options not changed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_INDEX_NOT_EXIST, "Index not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_INDEX_NOT_EXIST, "Index not exist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SYS_TABLENAME, "Invalid system table name")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SYS_TABLENAME, "Invalid system table name")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_CREATING, "Database in creating status")
|
||||||
|
|
||||||
// mnode-node
|
// mnode-node
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists")
|
||||||
|
|
|
@ -208,7 +208,7 @@ STaosQall *taosAllocateQall() {
|
||||||
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
|
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
|
||||||
|
|
||||||
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
|
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
|
||||||
int32_t code = 0;
|
int32_t numOfItems = 0;
|
||||||
bool empty;
|
bool empty;
|
||||||
|
|
||||||
taosThreadMutexLock(&queue->mutex);
|
taosThreadMutexLock(&queue->mutex);
|
||||||
|
@ -219,13 +219,14 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
|
||||||
qall->current = queue->head;
|
qall->current = queue->head;
|
||||||
qall->start = queue->head;
|
qall->start = queue->head;
|
||||||
qall->numOfItems = queue->numOfItems;
|
qall->numOfItems = queue->numOfItems;
|
||||||
code = qall->numOfItems;
|
numOfItems = qall->numOfItems;
|
||||||
|
|
||||||
queue->head = NULL;
|
queue->head = NULL;
|
||||||
queue->tail = NULL;
|
queue->tail = NULL;
|
||||||
queue->numOfItems = 0;
|
queue->numOfItems = 0;
|
||||||
queue->memOfItems = 0;
|
queue->memOfItems = 0;
|
||||||
uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems);
|
uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, numOfItems, queue, queue->numOfItems,
|
||||||
|
queue->memOfItems);
|
||||||
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
|
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,7 +238,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
|
||||||
qall->start = NULL;
|
qall->start = NULL;
|
||||||
qall->numOfItems = 0;
|
qall->numOfItems = 0;
|
||||||
}
|
}
|
||||||
return code;
|
return numOfItems;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
|
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
|
||||||
|
|
Loading…
Reference in New Issue