fix:conflicts from main

This commit is contained in:
wangmm0220 2023-07-09 12:52:18 +08:00
commit 35ba132a29
55 changed files with 578 additions and 395 deletions

View File

@ -81,10 +81,6 @@ Set<String> subscription() throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException; ConsumerRecords<V> poll(Duration timeout) throws SQLException;
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitSync() throws SQLException; void commitSync() throws SQLException;
void close() throws SQLException; void close() throws SQLException;

View File

@ -283,6 +283,8 @@ Provides dnode configuration information.
| 2 | consumer_group | BINARY(193) | Subscribed consumer group | | 2 | consumer_group | BINARY(193) | Subscribed consumer group |
| 3 | vgroup_id | INT | Vgroup ID for the consumer | | 3 | vgroup_id | INT | Vgroup ID for the consumer |
| 4 | consumer_id | BIGINT | Consumer ID | | 4 | consumer_id | BIGINT | Consumer ID |
| 5 | offset | BINARY(64) | Consumption progress |
| 6 | rows | BIGINT | Number of consumption items |
## INS_STREAMS ## INS_STREAMS

View File

@ -36,15 +36,16 @@ REST connection supports all platforms that can run Java.
| taos-jdbcdriver version | major changes | TDengine version | | taos-jdbcdriver version | major changes | TDengine version |
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: | | :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
| 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | 3.0.5.0 or later |
| 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later | | 3.2.3 | Fixed resultSet data parsing failure in some cases | 3.0.5.0 or later |
| 3.2.2 | subscription add seek function | 3.0.5.0 or later | | 3.2.2 | Subscription add seek function | 3.0.5.0 or later |
| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later | | 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later |
| 3.2.0 | This version has been deprecated | - | | 3.2.0 | This version has been deprecated | - |
| 3.1.0 | JDBC REST connection supports subscription over WebSocket | - | | 3.1.0 | JDBC REST connection supports subscription over WebSocket | - |
| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - | | 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - |
| 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later | | 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later |
| 2.0.42 | fix wasNull interface return value in WebSocket connection | - | | 2.0.42 | Fix wasNull interface return value in WebSocket connection | - |
| 2.0.41 | fix decode method of username and password in REST connection | - | | 2.0.41 | Fix decode method of username and password in REST connection | - |
| 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - | | 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - |
| 2.0.38 | JDBC REST connections add bulk pull function | - | | 2.0.38 | JDBC REST connections add bulk pull function | - |
| 2.0.37 | Support json tags | - | | 2.0.37 | Support json tags | - |

View File

@ -81,10 +81,6 @@ Set<String> subscription() throws SQLException;
ConsumerRecords<V> poll(Duration timeout) throws SQLException; ConsumerRecords<V> poll(Duration timeout) throws SQLException;
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitSync() throws SQLException; void commitSync() throws SQLException;
void close() throws SQLException; void close() throws SQLException;

View File

@ -36,6 +36,7 @@ REST 连接支持所有能运行 Java 的平台。
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 | | taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: | | :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: |
| 3.2.4 | 数据订阅在 WebSocket 连接下增加 enable.auto.commit 参数,以及 unsubscribe() 方法。 | - |
| 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - | | 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - |
| 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 | | 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 |
| 3.2.1 | 新增功能WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更consumer poll 返回结果集为 ConsumerRecord可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 | | 3.2.1 | 新增功能WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更consumer poll 返回结果集为 ConsumerRecord可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 |

View File

@ -284,6 +284,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| 2 | consumer_group | BINARY(193) | 订阅者的消费者组 | | 2 | consumer_group | BINARY(193) | 订阅者的消费者组 |
| 3 | vgroup_id | INT | 消费者被分配的 vgroup id | | 3 | vgroup_id | INT | 消费者被分配的 vgroup id |
| 4 | consumer_id | BIGINT | 消费者的唯一 id | | 4 | consumer_id | BIGINT | 消费者的唯一 id |
| 5 | offset | BINARY(64) | 消费者的消费进度 |
| 6 | rows | BIGINT | 消费者的消费的数据条数 |
## INS_STREAMS ## INS_STREAMS

View File

@ -46,6 +46,7 @@ typedef struct SRpcHandleInfo {
int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp) int8_t noResp; // has response or not(default 0, 0: resp, 1: no resp)
int8_t persistHandle; // persist handle or not int8_t persistHandle; // persist handle or not
int8_t hasEpSet; int8_t hasEpSet;
int32_t cliVer;
// app info // app info
void *ahandle; // app handle set by client void *ahandle; // app handle set by client
@ -83,6 +84,7 @@ typedef struct SRpcInit {
int32_t sessions; // number of sessions allowed int32_t sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int32_t idleTime; // milliseconds, 0 means idle timer is disabled int32_t idleTime; // milliseconds, 0 means idle timer is disabled
int32_t compatibilityVer;
int32_t retryMinInterval; // retry init interval int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor int32_t retryStepFactor; // retry interval factor

View File

@ -87,7 +87,7 @@ os.system("rm -rf /tmp/dumpdata/*")
# dump data out # dump data out
print("taosdump dump out data") print("taosdump dump out data")
os.system("taosdump -o /tmp/dumpdata -D test -y -h %s "%serverHost) os.system("taosdump -o /tmp/dumpdata -D test -h %s "%serverHost)
# drop database of test # drop database of test
print("drop database test") print("drop database test")
@ -95,7 +95,7 @@ os.system(" taos -s ' drop database test ;' -h %s "%serverHost)
# dump data in # dump data in
print("taosdump dump data in") print("taosdump dump data in")
os.system("taosdump -i /tmp/dumpdata -y -h %s "%serverHost) os.system("taosdump -i /tmp/dumpdata -h %s "%serverHost)
result = conn.query("SELECT count(*) from test.meters") result = conn.query("SELECT count(*) from test.meters")

View File

@ -152,7 +152,7 @@ function wgetFile {
file=$1 file=$1
versionPath=$2 versionPath=$2
sourceP=$3 sourceP=$3
nasServerIP="192.168.1.131" nasServerIP="192.168.1.213"
packagePath="/nas/TDengine/v${versionPath}/${verMode}" packagePath="/nas/TDengine/v${versionPath}/${verMode}"
if [ -f ${file} ];then if [ -f ${file} ];then
echoColor YD "${file} already exists ,it will delete it and download it again " echoColor YD "${file} already exists ,it will delete it and download it again "

View File

@ -29,6 +29,7 @@
#include "trpc.h" #include "trpc.h"
#include "tsched.h" #include "tsched.h"
#include "ttime.h" #include "ttime.h"
#include "tversion.h"
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
#include "cus_name.h" #include "cus_name.h"
@ -111,7 +112,8 @@ static void deregisterRequest(SRequestObj *pRequest) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
if (tsSlowLogScope & reqType) { if (tsSlowLogScope & reqType) {
taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s", taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 ", Duration:%" PRId64 "us, SQL:%s",
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr); taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
pRequest->sqlstr);
} }
} }
@ -175,6 +177,8 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
void *pDnodeConn = rpcOpen(&rpcInit); void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) { if (pDnodeConn == NULL) {
tscError("failed to init connection to server"); tscError("failed to init connection to server");
@ -358,17 +362,16 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); } int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
void destroySubRequests(SRequestObj *pRequest) { void destroySubRequests(SRequestObj *pRequest) {
int32_t reqIdx = -1; int32_t reqIdx = -1;
SRequestObj *pReqList[16] = {NULL}; SRequestObj *pReqList[16] = {NULL};
uint64_t tmpRefId = 0; uint64_t tmpRefId = 0;
if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
return; return;
} }
SRequestObj* pTmp = pRequest; SRequestObj *pTmp = pRequest;
while (pTmp->relation.prevRefId) { while (pTmp->relation.prevRefId) {
tmpRefId = pTmp->relation.prevRefId; tmpRefId = pTmp->relation.prevRefId;
pTmp = acquireRequest(tmpRefId); pTmp = acquireRequest(tmpRefId);
@ -376,9 +379,9 @@ void destroySubRequests(SRequestObj *pRequest) {
pReqList[++reqIdx] = pTmp; pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId); releaseRequest(tmpRefId);
} else { } else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
tmpRefId, pTmp->requestId); pTmp->requestId);
break; break;
} }
} }
@ -391,16 +394,15 @@ void destroySubRequests(SRequestObj *pRequest) {
pTmp = acquireRequest(tmpRefId); pTmp = acquireRequest(tmpRefId);
if (pTmp) { if (pTmp) {
tmpRefId = pTmp->relation.nextRefId; tmpRefId = pTmp->relation.nextRefId;
removeRequest(pTmp->self); removeRequest(pTmp->self);
releaseRequest(pTmp->self); releaseRequest(pTmp->self);
} else { } else {
tscError("0x%" PRIx64 " is not there", tmpRefId); tscError("0x%" PRIx64 " is not there", tmpRefId);
break; break;
} }
} }
} }
void doDestroyRequest(void *p) { void doDestroyRequest(void *p) {
if (NULL == p) { if (NULL == p) {
return; return;
@ -412,7 +414,7 @@ void doDestroyRequest(void *p) {
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
destroySubRequests(pRequest); destroySubRequests(pRequest);
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
@ -473,15 +475,15 @@ void taosStopQueryImpl(SRequestObj *pRequest) {
} }
void stopAllQueries(SRequestObj *pRequest) { void stopAllQueries(SRequestObj *pRequest) {
int32_t reqIdx = -1; int32_t reqIdx = -1;
SRequestObj *pReqList[16] = {NULL}; SRequestObj *pReqList[16] = {NULL};
uint64_t tmpRefId = 0; uint64_t tmpRefId = 0;
if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) { if (pRequest->relation.userRefId && pRequest->relation.userRefId != pRequest->self) {
return; return;
} }
SRequestObj* pTmp = pRequest; SRequestObj *pTmp = pRequest;
while (pTmp->relation.prevRefId) { while (pTmp->relation.prevRefId) {
tmpRefId = pTmp->relation.prevRefId; tmpRefId = pTmp->relation.prevRefId;
pTmp = acquireRequest(tmpRefId); pTmp = acquireRequest(tmpRefId);
@ -489,9 +491,9 @@ void stopAllQueries(SRequestObj *pRequest) {
pReqList[++reqIdx] = pTmp; pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId); releaseRequest(tmpRefId);
} else { } else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
tmpRefId, pTmp->requestId); pTmp->requestId);
break; break;
} }
} }
@ -510,12 +512,11 @@ void stopAllQueries(SRequestObj *pRequest) {
releaseRequest(pTmp->self); releaseRequest(pTmp->self);
} else { } else {
tscError("0x%" PRIx64 " is not there", tmpRefId); tscError("0x%" PRIx64 " is not there", tmpRefId);
break; break;
} }
} }
} }
void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); } void crashReportThreadFuncUnexpectedStopped(void) { atomic_store_32(&clientStop, -1); }
static void *tscCrashReportThreadFp(void *param) { static void *tscCrashReportThreadFp(void *param) {

View File

@ -137,7 +137,6 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
vgInfo->hashSuffix = rsp->hashSuffix; vgInfo->hashSuffix = rsp->hashSuffix;
vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == vgInfo->vgHash) { if (NULL == vgInfo->vgHash) {
taosMemoryFree(vgInfo);
tscError("hash init[%d] failed", rsp->vgNum); tscError("hash init[%d] failed", rsp->vgNum);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _return; goto _return;
@ -147,8 +146,6 @@ static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j); SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) { if (taosHashPut(vgInfo->vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
tscError("hash push failed, errno:%d", errno); tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo->vgHash);
taosMemoryFree(vgInfo);
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _return; goto _return;
} }
@ -486,7 +483,6 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
if (code) { if (code) {
taosArrayDestroy(desc.subDesc); taosArrayDestroy(desc.subDesc);
desc.subDesc = NULL; desc.subDesc = NULL;
desc.subPlanNum = 0;
} }
desc.subPlanNum = taosArrayGetSize(desc.subDesc); desc.subPlanNum = taosArrayGetSize(desc.subDesc);
} else { } else {
@ -592,7 +588,7 @@ static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClient
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _return; goto _return;
} }
strncpy(user->user, pTscObj->user, TSDB_USER_LEN); tstrncpy(user->user, pTscObj->user, TSDB_USER_LEN);
user->version = htonl(-1); // force get userAuthInfo user->version = htonl(-1); // force get userAuthInfo
kv.valueLen = sizeof(SUserAuthVersion); kv.valueLen = sizeof(SUserAuthVersion);
kv.value = user; kv.value = user;

View File

@ -26,7 +26,7 @@
#include "tpagedbuf.h" #include "tpagedbuf.h"
#include "tref.h" #include "tref.h"
#include "tsched.h" #include "tsched.h"
#include "tversion.h"
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
@ -237,8 +237,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t buildPreviousRequest(SRequestObj *pRequest, const char* sql, SRequestObj** pNewRequest) { int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
int32_t code = buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0); int32_t code =
buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pRequest->relation.prevRefId = (*pNewRequest)->self; pRequest->relation.prevRefId = (*pNewRequest)->self;
(*pNewRequest)->relation.nextRefId = pRequest->self; (*pNewRequest)->relation.nextRefId = pRequest->self;
@ -502,8 +503,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
pResInfo->userFields[i].bytes = pSchema[i].bytes; pResInfo->userFields[i].bytes = pSchema[i].bytes;
pResInfo->userFields[i].type = pSchema[i].type; pResInfo->userFields[i].type = pSchema[i].type;
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE; pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) { } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
@ -891,7 +891,7 @@ static bool incompletaFileParsing(SNode* pStmt) {
void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) { void continuePostSubQuery(SRequestObj* pRequest, TAOS_ROW row) {
SSqlCallbackWrapper* pWrapper = pRequest->pWrapper; SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId); int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
int64_t analyseStart = taosGetTimestampUs(); int64_t analyseStart = taosGetTimestampUs();
code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row); code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, (void**)row);
@ -934,7 +934,7 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
if (rowNum > 0) { if (rowNum > 0) {
row = taos_fetch_row(res); // for single row only now row = taos_fetch_row(res); // for single row only now
} }
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId); SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
@ -2135,6 +2135,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
connLimitNum = TMIN(connLimitNum, 500); connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
clientRpc = rpcOpen(&rpcInit); clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) { if (clientRpc == NULL) {
@ -2494,11 +2495,10 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
return pRequest; return pRequest;
} }
static void fetchCallback(void* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*)param;
static void fetchCallback(void *pResult, void *param, int32_t code) { SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
SRequestObj *pRequest = (SRequestObj *)param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
tstrerror(code), pRequest->requestId); tstrerror(code), pRequest->requestId);
@ -2520,7 +2520,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
} }
pRequest->code = pRequest->code =
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true); setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, true);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
pRequest->code = code; pRequest->code = code;
@ -2531,19 +2531,19 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
pRequest->requestId); pRequest->requestId);
STscObj *pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary; SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
atomic_add_fetch_64((int64_t *)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
} }
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
} }
void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param) { void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
pRequest->body.fetchFp = fp; pRequest->body.fetchFp = fp;
pRequest->body.param = param; pRequest->body.param = param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
// this query has no results or error exists, return directly // this query has no results or error exists, return directly
if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) { if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
@ -2578,5 +2578,3 @@ void taosAsyncFetchImpl(SRequestObj *pRequest, __taos_async_fn_t fp, void *param
schedulerFetchRows(pRequest->body.queryJob, &req); schedulerFetchRows(pRequest->body.queryJob, &req);
} }

View File

@ -558,13 +558,12 @@ int taos_select_db(TAOS *taos, const char *db) {
return code; return code;
} }
void taos_stop_query(TAOS_RES *res) { void taos_stop_query(TAOS_RES *res) {
if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) { if (res == NULL || TD_RES_TMQ(res) || TD_RES_TMQ_META(res) || TD_RES_TMQ_METADATA(res)) {
return; return;
} }
stopAllQueries((SRequestObj*)res); stopAllQueries((SRequestObj *)res);
} }
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
@ -785,7 +784,7 @@ void destorySqlCallbackWrapper(SSqlCallbackWrapper *pWrapper) {
taosMemoryFree(pWrapper); taosMemoryFree(pWrapper);
} }
void destroyCtxInRequest(SRequestObj* pRequest) { void destroyCtxInRequest(SRequestObj *pRequest) {
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
qDestroyQuery(pRequest->pQuery); qDestroyQuery(pRequest->pQuery);
pRequest->pQuery = NULL; pRequest->pQuery = NULL;
@ -793,7 +792,6 @@ void destroyCtxInRequest(SRequestObj* pRequest) {
pRequest->pWrapper = NULL; pRequest->pWrapper = NULL;
} }
static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) { static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t code) {
SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param; SSqlCallbackWrapper *pWrapper = (SSqlCallbackWrapper *)param;
SRequestObj *pRequest = pWrapper->pRequest; SRequestObj *pRequest = pWrapper->pRequest;
@ -807,15 +805,15 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pParseCtx, pWrapper->pCatalogReq, pResultMeta, pQuery);
} }
pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart; pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
handleQueryAnslyseRes(pWrapper, pResultMeta, code); handleQueryAnslyseRes(pWrapper, pResultMeta, code);
} }
int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) { int32_t cloneCatalogReq(SCatalogReq **ppTarget, SCatalogReq *pSrc) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SCatalogReq* pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq)); SCatalogReq *pTarget = taosMemoryCalloc(1, sizeof(SCatalogReq));
if (pTarget == NULL) { if (pTarget == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} else { } else {
@ -842,17 +840,16 @@ int32_t cloneCatalogReq(SCatalogReq* * ppTarget, SCatalogReq* pSrc) {
return code; return code;
} }
void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode *pRoot) {
void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, SNode* pRoot) { SRequestObj *pNewRequest = NULL;
SRequestObj* pNewRequest = NULL; SSqlCallbackWrapper *pNewWrapper = NULL;
SSqlCallbackWrapper* pNewWrapper = NULL; int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest);
int32_t code = buildPreviousRequest(pWrapper->pRequest, pWrapper->pRequest->sqlstr, &pNewRequest);
if (code) { if (code) {
handleQueryAnslyseRes(pWrapper, pResultMeta, code); handleQueryAnslyseRes(pWrapper, pResultMeta, code);
return; return;
} }
pNewRequest->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); pNewRequest->pQuery = (SQuery *)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == pNewRequest->pQuery) { if (NULL == pNewRequest->pQuery) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} else { } else {
@ -871,16 +868,16 @@ void handleSubQueryFromAnalyse(SSqlCallbackWrapper *pWrapper, SMetaData *pResult
} }
void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) { void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta, int32_t code) {
SRequestObj *pRequest = pWrapper->pRequest; SRequestObj *pRequest = pWrapper->pRequest;
SQuery *pQuery = pRequest->pQuery; SQuery *pQuery = pRequest->pQuery;
if (code == TSDB_CODE_SUCCESS && pQuery->pPrevRoot) { if (code == TSDB_CODE_SUCCESS && pQuery->pPrevRoot) {
SNode* prevRoot = pQuery->pPrevRoot; SNode *prevRoot = pQuery->pPrevRoot;
pQuery->pPrevRoot = NULL; pQuery->pPrevRoot = NULL;
handleSubQueryFromAnalyse(pWrapper, pResultMeta, prevRoot); handleSubQueryFromAnalyse(pWrapper, pResultMeta, prevRoot);
return; return;
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pRequest->stableQuery = pQuery->stableQuery; pRequest->stableQuery = pQuery->stableQuery;
if (pQuery->pRoot) { if (pQuery->pRoot) {
@ -1043,7 +1040,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
} }
int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce) { int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *pRequest, bool updateMetaForce) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper)); SSqlCallbackWrapper *pWrapper = taosMemoryCalloc(1, sizeof(SSqlCallbackWrapper));
if (pWrapper == NULL) { if (pWrapper == NULL) {
@ -1081,7 +1078,6 @@ int32_t prepareAndParseSqlSyntax(SSqlCallbackWrapper **ppWrapper, SRequestObj *p
return code; return code;
} }
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SSqlCallbackWrapper *pWrapper = NULL; SSqlCallbackWrapper *pWrapper = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1128,12 +1124,12 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
} }
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) { void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
int32_t reqIdx = 0; int32_t reqIdx = 0;
SRequestObj *pReqList[16] = {NULL}; SRequestObj *pReqList[16] = {NULL};
SRequestObj *pUserReq = NULL; SRequestObj *pUserReq = NULL;
pReqList[0] = pRequest; pReqList[0] = pRequest;
uint64_t tmpRefId = 0; uint64_t tmpRefId = 0;
SRequestObj* pTmp = pRequest; SRequestObj *pTmp = pRequest;
while (pTmp->relation.prevRefId) { while (pTmp->relation.prevRefId) {
tmpRefId = pTmp->relation.prevRefId; tmpRefId = pTmp->relation.prevRefId;
pTmp = acquireRequest(tmpRefId); pTmp = acquireRequest(tmpRefId);
@ -1141,9 +1137,9 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
pReqList[++reqIdx] = pTmp; pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId); releaseRequest(tmpRefId);
} else { } else {
tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tscError("0x%" PRIx64 ", prev req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pTmp->self, tmpRefId,
tmpRefId, pTmp->requestId); pTmp->requestId);
break; break;
} }
} }
@ -1152,11 +1148,11 @@ void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
pTmp = acquireRequest(tmpRefId); pTmp = acquireRequest(tmpRefId);
if (pTmp) { if (pTmp) {
tmpRefId = pTmp->relation.nextRefId; tmpRefId = pTmp->relation.nextRefId;
removeRequest(pTmp->self); removeRequest(pTmp->self);
releaseRequest(pTmp->self); releaseRequest(pTmp->self);
} else { } else {
tscError("0x%" PRIx64 " is not there", tmpRefId); tscError("0x%" PRIx64 " is not there", tmpRefId);
break; break;
} }
} }

View File

@ -1286,6 +1286,10 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
taosArrayPush(pArray, &pVgData); taosArrayPush(pArray, &pVgData);
pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY);
if (NULL == pQuery) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_VND_ALTER_TABLE; pQuery->msgType = TDMT_VND_ALTER_TABLE;
pQuery->stableQuery = false; pQuery->stableQuery = false;

View File

@ -678,6 +678,8 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam); pCommitFp(tmq, code, userParam);
} }
// update the offset value.
pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
} else { // do not perform commit, callback user function directly. } else { // do not perform commit, callback user function directly.
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam); pCommitFp(tmq, code, userParam);
@ -1877,6 +1879,23 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return 0; return 0;
} }
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, int64_t ever, int64_t consumerId){
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.currentOffset = *offset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
}
// update the status
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// update the valid wal version range
pVg->offsetInfo.walVerBegin = sver;
pVg->offsetInfo.walVerEnd = ever;
pVg->receivedInfoFromVnode = true;
}
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
@ -1925,22 +1944,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
// update the local offset value only for the returned values, only when the local offset is NOT updated updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
// by tmq_offset_seek function
if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set, rsp offset:%d,%"PRId64, tmq->consumerId, pDataRsp->rspOffset.type, pDataRsp->rspOffset.version);
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
}
// update the status
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// update the valid wal version range
pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
pVg->receivedInfoFromVnode = true;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
@ -1990,11 +1994,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId);
pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
}
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp // build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
@ -2022,18 +2022,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
// update the local offset value only for the returned values, only when the local offset is NOT updated updateVgInfo(pVg, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId);
// by tmq_offset_seek function
if (!pVg->seekUpdated) {
if(pollRspWrapper->taosxRsp.rspOffset.type != 0) { // if offset is validate
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
}
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
}
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->taosxRsp.blockNum == 0) { if (pollRspWrapper->taosxRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
@ -2615,6 +2604,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->end = pClientVg->offsetInfo.walVerEnd;
pAssignment->vgId = pClientVg->vgId; pAssignment->vgId = pClientVg->vgId;
tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId,
pAssignment->vgId, pAssignment->currentOffset);
} }
if (needFetch) { if (needFetch) {
@ -2690,7 +2681,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
} }
@ -2718,17 +2709,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; // pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
char offsetBuf[TSDB_OFFSET_LEN] = {0}; // char offsetBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); // tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerBegin = p->begin;
pOffsetInfo->walVerEnd = p->end; pOffsetInfo->walVerEnd = p->end;
pOffsetInfo->currentOffset.version = p->currentOffset; // pOffsetInfo->currentOffset.version = p->currentOffset;
pOffsetInfo->committedOffset.version = p->currentOffset; // pOffsetInfo->committedOffset.version = p->currentOffset;
} }
} }
} }

View File

@ -1073,6 +1073,146 @@ TEST(clientCase, sub_db_test) {
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
} }
TEST(clientCase, td_25129) {
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "tp");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
TAOS_FIELD* fields = NULL;
int32_t numOfFields = 0;
int32_t precision = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 2000;
int32_t count = 0;
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4);
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[128];
const char* topicName = tmq_get_topic_name(pRes);
// const char* dbName = tmq_get_db_name(pRes);
// int32_t vgroupId = tmq_get_vgroup_id(pRes);
//
// printf("topic: %s\n", topicName);
// printf("db: %s\n", dbName);
// printf("vgroup id: %d\n", vgroupId);
printSubResults(pRes, &totalRows);
} else {
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
continue;
}
// tmq_commit_sync(tmq, pRes);
if (pRes != NULL) {
taos_free_result(pRes);
// if ((++count) > 1) {
// break;
// }
} else {
break;
}
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
}
tmq_free_assignment(pAssign);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
TEST(clientCase, sub_tb_test) { TEST(clientCase, sub_tb_test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");

View File

@ -423,7 +423,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
val = (const void *)&pColVal->value.val; val = (const void *)&pColVal->value.val;
} }
} else { } else {
pColVal = NULL; // pColVal = NULL;
valType = TD_VTYPE_NONE; valType = TD_VTYPE_NONE;
} }

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmMgmt.h" #include "dmMgmt.h"
#include "qworker.h" #include "qworker.h"
#include "tversion.h"
static inline void dmSendRsp(SRpcMsg *pMsg) { rpcSendResponse(pMsg); } static inline void dmSendRsp(SRpcMsg *pMsg) { rpcSendResponse(pMsg); }
@ -73,6 +74,13 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId); pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
int32_t svrVer = 0;
taosVersionStrToInt(version, &svrVer);
if (0 != taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) {
dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer);
goto _OVER;
}
switch (pRpc->msgType) { switch (pRpc->msgType) {
case TDMT_DND_NET_TEST: case TDMT_DND_NET_TEST:
dmProcessNetTestReq(pDnode, pRpc); dmProcessNetTestReq(pDnode, pRpc);
@ -305,6 +313,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.supportBatch = 1; rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024; rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {
@ -339,7 +348,7 @@ int32_t dmInitServer(SDnode *pDnode) {
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.parent = pDnode; rpcInit.parent = pDnode;
rpcInit.compressSize = tsCompressMsgSize; rpcInit.compressSize = tsCompressMsgSize;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
pTrans->serverRpc = rpcOpen(&rpcInit); pTrans->serverRpc = rpcOpen(&rpcInit);
if (pTrans->serverRpc == NULL) { if (pTrans->serverRpc == NULL) {
dError("failed to init dnode rpc server"); dError("failed to init dnode rpc server");

View File

@ -16,6 +16,7 @@
#include "sut.h" #include "sut.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tmisce.h" #include "tmisce.h"
#include "tversion.h"
static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) { static void processClientRsp(void* parent, SRpcMsg* pRsp, SEpSet* pEpSet) {
TestClient* client = (TestClient*)parent; TestClient* client = (TestClient*)parent;
@ -53,6 +54,7 @@ void TestClient::DoInit() {
rpcInit.parent = this; rpcInit.parent = this;
// rpcInit.secret = (char*)secretEncrypt; // rpcInit.secret = (char*)secretEncrypt;
// rpcInit.spi = 1; // rpcInit.spi = 1;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
clientRpc = rpcOpen(&rpcInit); clientRpc = rpcOpen(&rpcInit);
ASSERT(clientRpc); ASSERT(clientRpc);

View File

@ -275,7 +275,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
taosArrayPush(pOutput->newConsumers, &consumerId); taosArrayPush(pOutput->newConsumers, &consumerId);
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId); mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId);
} }
} }

View File

@ -79,16 +79,18 @@ typedef struct {
TXN* pTxn; TXN* pTxn;
} STtlDelTtlCtx; } STtlDelTtlCtx;
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback); int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
int ttlMgrClose(STtlManger* pTtlMgr); void ttlMgrClose(STtlManger* pTtlMgr);
int ttlMgrBegin(STtlManger* pTtlMgr, void* pMeta); int ttlMgrPostOpen(STtlManger* pTtlMgr, void* pMeta);
int ttlMgrConvert(TTB* pOldTtlIdx, TTB* pNewTtlIdx, void* pMeta); bool ttlMgrNeedUpgrade(TDB* pEnv);
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn); int ttlMgrUpgrade(STtlManger* pTtlMgr, void* pMeta);
int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx); int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx);
int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx); int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx); int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids); int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -136,6 +136,7 @@ typedef struct STbUidStore STbUidStore;
#define META_BEGIN_HEAP_NIL 2 #define META_BEGIN_HEAP_NIL 2
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
int metaUpgrade(SVnode* pVnode, SMeta** ppMeta);
int metaClose(SMeta** pMeta); int metaClose(SMeta** pMeta);
int metaBegin(SMeta* pMeta, int8_t fromSys); int metaBegin(SMeta* pMeta, int8_t fromSys);
TXN* metaGetTxn(SMeta* pMeta); TXN* metaGetTxn(SMeta* pMeta);

View File

@ -40,10 +40,6 @@ int metaBegin(SMeta *pMeta, int8_t heap) {
return -1; return -1;
} }
if (ttlMgrBegin(pMeta->pTtlMgr, pMeta) < 0) {
return -1;
}
tdbCommit(pMeta->pEnv, pMeta->txn); tdbCommit(pMeta->pEnv, pMeta->txn);
return 0; return 0;

View File

@ -29,6 +29,8 @@ static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
static void metaCleanup(SMeta **ppMeta);
int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
SMeta *pMeta = NULL; SMeta *pMeta = NULL;
int ret; int ret;
@ -180,51 +182,43 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
return 0; return 0;
_err: _err:
if (pMeta->pIdx) metaCloseIdx(pMeta); metaCleanup(&pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
metaDestroyLock(pMeta);
taosMemoryFree(pMeta);
return -1; return -1;
} }
int metaClose(SMeta **ppMeta) { int metaUpgrade(SVnode *pVnode, SMeta **ppMeta) {
int code = TSDB_CODE_SUCCESS;
SMeta *pMeta = *ppMeta; SMeta *pMeta = *ppMeta;
if (pMeta) {
if (pMeta->pEnv) metaAbort(pMeta);
if (pMeta->pCache) metaCacheClose(pMeta);
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
metaDestroyLock(pMeta);
taosMemoryFreeClear(*ppMeta); if (ttlMgrNeedUpgrade(pMeta->pEnv)) {
code = metaBegin(pMeta, META_BEGIN_HEAP_OS);
if (code < 0) {
metaError("vgId:%d, failed to upgrade meta, meta begin failed since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
code = ttlMgrUpgrade(pMeta->pTtlMgr, pMeta);
if (code < 0) {
metaError("vgId:%d, failed to upgrade meta ttl since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
code = metaCommit(pMeta, pMeta->txn);
if (code < 0) {
metaError("vgId:%d, failed to upgrade meta ttl, meta commit failed since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
} }
return TSDB_CODE_SUCCESS;
_err:
metaCleanup(ppMeta);
return code;
}
int metaClose(SMeta **ppMeta) {
metaCleanup(ppMeta);
return 0; return 0;
} }
@ -270,6 +264,32 @@ int32_t metaULock(SMeta *pMeta) {
return ret; return ret;
} }
static void metaCleanup(SMeta **ppMeta) {
SMeta *pMeta = *ppMeta;
if (pMeta) {
if (pMeta->pEnv) metaAbort(pMeta);
if (pMeta->pCache) metaCacheClose(pMeta);
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
if (pMeta->pSuidIdx) tdbTbClose(pMeta->pSuidIdx);
if (pMeta->pNameIdx) tdbTbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbTbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbTbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbTbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbClose(pMeta->pEnv);
metaDestroyLock(pMeta);
taosMemoryFreeClear(*ppMeta);
}
}
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1; STbDbKey *pTbDbKey1 = (STbDbKey *)pKey1;
STbDbKey *pTbDbKey2 = (STbDbKey *)pKey2; STbDbKey *pTbDbKey2 = (STbDbKey *)pKey2;

View File

@ -207,7 +207,10 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
tb_uid_t uid = *(tb_uid_t *)pData; tb_uid_t uid = *(tb_uid_t *)pData;
tdbFree(pData); tdbFree(pData);
SMetaInfo info; SMetaInfo info;
metaGetInfo(pMeta, uid, &info, NULL); if (metaGetInfo(pMeta, uid, &info, NULL) == TSDB_CODE_NOT_FOUND) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
if (info.uid == info.suid) { if (info.uid == info.suid) {
return 0; return 0;
} else { } else {
@ -939,7 +942,7 @@ int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) {
return 0; return 0;
} }
metaInfo("ttl find expired table count: %zu" , TARRAY_SIZE(tbUids)); metaInfo("ttl find expired table count: %zu", TARRAY_SIZE(tbUids));
metaDropTables(pMeta, tbUids); metaDropTables(pMeta, tbUids);
return 0; return 0;

View File

@ -21,6 +21,10 @@ typedef struct {
SMeta *pMeta; SMeta *pMeta;
} SConvertData; } SConvertData;
static void ttlMgrCleanup(STtlManger *pTtlMgr);
static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta);
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid); static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
@ -36,27 +40,17 @@ const char *ttlTbname = "ttl.idx";
const char *ttlV1Tbname = "ttlv1.idx"; const char *ttlV1Tbname = "ttlv1.idx";
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) { int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
int ret; int ret = TSDB_CODE_SUCCESS;
int64_t startNs = taosGetTimestampNs();
*ppTtlMgr = NULL; *ppTtlMgr = NULL;
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr)); STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
if (pTtlMgr == NULL) { if (pTtlMgr == NULL) return TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tdbTbExist(ttlTbname, pEnv)) {
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pEnv, &pTtlMgr->pOldTtlIdx, rollback);
if (ret < 0) {
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
return ret;
}
}
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback); ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
if (ret < 0) { if (ret < 0) {
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno)); metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
tdbOsFree(pTtlMgr); tdbOsFree(pTtlMgr);
return ret; return ret;
} }
@ -66,42 +60,57 @@ int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
taosThreadRwlockInit(&pTtlMgr->lock, NULL); taosThreadRwlockInit(&pTtlMgr->lock, NULL);
ret = ttlMgrFillCache(pTtlMgr);
if (ret < 0) {
metaError("failed to fill hash since %s", tstrerror(terrno));
ttlMgrCleanup(pTtlMgr);
return ret;
}
int64_t endNs = taosGetTimestampNs();
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
endNs - startNs);
*ppTtlMgr = pTtlMgr; *ppTtlMgr = pTtlMgr;
return 0; return TSDB_CODE_SUCCESS;
} }
int ttlMgrClose(STtlManger *pTtlMgr) { void ttlMgrClose(STtlManger *pTtlMgr) { ttlMgrCleanup(pTtlMgr); }
taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids); bool ttlMgrNeedUpgrade(TDB *pEnv) {
tdbTbClose(pTtlMgr->pTtlIdx); bool needUpgrade = tdbTbExist(ttlTbname, pEnv);
taosThreadRwlockDestroy(&pTtlMgr->lock); if (needUpgrade) {
tdbOsFree(pTtlMgr); metaInfo("find ttl idx in old version , will convert");
return 0; }
return needUpgrade;
} }
int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) { int ttlMgrUpgrade(STtlManger *pTtlMgr, void *pMeta) {
metaInfo("ttl mgr start open"); SMeta *meta = (SMeta *)pMeta;
int ret; int ret = TSDB_CODE_SUCCESS;
if (!tdbTbExist(ttlTbname, meta->pEnv)) return TSDB_CODE_SUCCESS;
metaInfo("ttl mgr start upgrade");
int64_t startNs = taosGetTimestampNs(); int64_t startNs = taosGetTimestampNs();
SMeta *meta = (SMeta *)pMeta; ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, meta->pEnv, &pTtlMgr->pOldTtlIdx, 0);
if (ret < 0) {
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
goto _out;
}
if (pTtlMgr->pOldTtlIdx) { ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta); if (ret < 0) {
if (ret < 0) { metaError("failed to convert ttl index since %s", tstrerror(terrno));
metaError("failed to convert ttl index since %s", tstrerror(terrno)); goto _out;
goto _out; }
}
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn); ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
if (ret < 0) { if (ret < 0) {
metaError("failed to drop old ttl index since %s", tstrerror(terrno)); metaError("failed to drop old ttl index since %s", tstrerror(terrno));
goto _out; goto _out;
}
tdbTbClose(pTtlMgr->pOldTtlIdx);
pTtlMgr->pOldTtlIdx = NULL;
} }
ret = ttlMgrFillCache(pTtlMgr); ret = ttlMgrFillCache(pTtlMgr);
@ -111,13 +120,23 @@ int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) {
} }
int64_t endNs = taosGetTimestampNs(); int64_t endNs = taosGetTimestampNs();
metaInfo("ttl mgr upgrade end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
endNs - startNs); endNs - startNs);
_out: _out:
tdbTbClose(pTtlMgr->pOldTtlIdx);
pTtlMgr->pOldTtlIdx = NULL;
return ret; return ret;
} }
static void ttlMgrCleanup(STtlManger *pTtlMgr) {
taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids);
tdbTbClose(pTtlMgr->pTtlIdx);
taosThreadRwlockDestroy(&pTtlMgr->lock);
tdbOsFree(pTtlMgr);
}
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) { static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
if (ttlDays <= 0) return; if (ttlDays <= 0) return;
@ -205,7 +224,7 @@ _out:
return ret; return ret;
} }
int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) { static int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
SMeta *meta = pMeta; SMeta *meta = pMeta;
metaInfo("ttlMgr convert ttl start."); metaInfo("ttlMgr convert ttl start.");

View File

@ -507,7 +507,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
if (!exec) { if (!exec) {
tqSetHandleExec(pHandle); tqSetHandleExec(pHandle);
// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); // qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
req.subKey, pHandle); req.subKey, pHandle);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
break; break;
@ -535,7 +535,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
tqSetHandleIdle(pHandle); tqSetHandleIdle(pHandle);
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
req.subKey, pHandle); req.subKey, pHandle);
return code; return code;
} }
@ -577,48 +577,47 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
SMqDataRsp dataRsp = {0}; SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req); tqInitDataRsp(&dataRsp, &req);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); if (req.useSnapshot == true) {
if (pOffset != NULL) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
if (pOffset->val.type != TMQ_OFFSET__LOG) { terrno = TSDB_CODE_INVALID_PARA;
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey); tDeleteMqDataRsp(&dataRsp);
terrno = TSDB_CODE_INVALID_PARA; return -1;
tDeleteMqDataRsp(&dataRsp); }
return -1;
}
dataRsp.rspOffset.type = TMQ_OFFSET__LOG; dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
dataRsp.rspOffset.version = pOffset->val.version;
} else {
if (req.useSnapshot == true) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
dataRsp.rspOffset.type = TMQ_OFFSET__LOG; if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = reqOffset.version;
if (reqOffset.type == TMQ_OFFSET__LOG) { } else if(reqOffset.type < 0){
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
if (currentVer == -1) { // not start to read data from wal yet, return req offset directly if (pOffset != NULL) {
dataRsp.rspOffset.version = reqOffset.version; if (pOffset->val.type != TMQ_OFFSET__LOG) {
} else { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
dataRsp.rspOffset.version = currentVer; // return current consume offset value terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
} }
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position dataRsp.rspOffset.version = pOffset->val.version;
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
dataRsp.rspOffset.version = ever; }else{
} else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
reqOffset.type); } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
terrno = TSDB_CODE_INVALID_PARA; dataRsp.rspOffset.version = ever;
tDeleteMqDataRsp(&dataRsp); }
return -1; tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version);
} }
} else {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
reqOffset.type);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
} }
tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
tDeleteMqDataRsp(&dataRsp);
return 0; return 0;
} }

View File

@ -1031,7 +1031,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
return code; return code;
} }
/*
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0; int32_t code = 0;
SLRUCache *pCache = pTsdb->lruCache; SLRUCache *pCache = pTsdb->lruCache;
@ -1079,7 +1079,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR
return code; return code;
} }
*/
int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) { int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey) {
int32_t code = 0; int32_t code = 0;
// fetch schema // fetch schema
@ -1829,10 +1829,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
} }
*pIgnoreEarlierTs = false; *pIgnoreEarlierTs = false;
/*
if (!hasVal) { if (!hasVal) {
state->state = SFSLASTNEXTROW_FILESET; state->state = SFSLASTNEXTROW_FILESET;
} }
*/
if (!state->checkRemainingRow) { if (!state->checkRemainingRow) {
state->checkRemainingRow = true; state->checkRemainingRow = true;
} }
@ -2020,10 +2021,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
if (block.maxKey.ts <= state->lastTs) { if (block.maxKey.ts <= state->lastTs) {
*pIgnoreEarlierTs = true; *pIgnoreEarlierTs = true;
if (state->pBlockData) {
tBlockDataDestroy(state->pBlockData); tBlockDataDestroy(state->pBlockData);
state->pBlockData = NULL; state->pBlockData = NULL;
}
*ppRow = NULL; *ppRow = NULL;
return code; return code;
@ -3176,97 +3176,46 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
TSKEY rowTs = TSDBROW_TS(pRow); TSKEY rowTs = TSDBROW_TS(pRow);
if (lastRowTs == TSKEY_MAX) { lastRowTs = rowTs;
lastRowTs = rowTs;
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
if (iCol >= nLastCol) {
break;
}
SLastCol *pCol = taosArrayGet(pColArray, iCol);
if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
continue;
}
if (slotIds[iCol] == 0) {
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs});
taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
continue;
}
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
*pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) {
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
if (pCol->colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (pColVal->value.nData > 0) {
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
}
/*if (COL_VAL_IS_NONE(pColVal)) {
if (!setNoneCol) {
noneCol = iCol;
setNoneCol = true;
}
} else {*/
int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
if (aColIndex >= 0) {
taosArrayRemove(aColArray, aColIndex);
}
//}
}
if (!setNoneCol) {
// done, goto return pColArray
break;
} else {
continue;
}
}
// merge into pColArray
setNoneCol = false;
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
if (iCol >= nLastCol) { if (iCol >= nLastCol) {
break; break;
} }
// high version's column value SLastCol *pCol = taosArrayGet(pColArray, iCol);
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol); if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) {
continue; continue;
} }
SColVal *tColVal = &lastColVal->colVal; if (slotIds[iCol] == 0) {
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = rowTs});
taosArraySet(pColArray, 0, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
continue;
}
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); *pCol = (SLastCol){.ts = rowTs, .colVal = *pColVal};
if (lastCol.colVal.value.pData == NULL) { if (IS_VAR_DATA_TYPE(pColVal->type) /*&& pColVal->value.nData > 0*/) {
terrno = TSDB_CODE_OUT_OF_MEMORY; pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
code = TSDB_CODE_OUT_OF_MEMORY; if (pCol->colVal.value.pData == NULL) {
goto _err; terrno = TSDB_CODE_OUT_OF_MEMORY;
} code = TSDB_CODE_OUT_OF_MEMORY;
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); goto _err;
} }
if (pColVal->value.nData > 0) {
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
}
taosArraySet(pColArray, iCol, &lastCol); int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ);
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ); if (aColIndex >= 0) {
taosArrayRemove(aColArray, aColIndex); taosArrayRemove(aColArray, aColIndex);
} else if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
noneCol = iCol;
setNoneCol = true;
} }
} }
} while (setNoneCol);
break;
} while (1);
if (!hasRow) { if (!hasRow) {
if (ignoreEarlierTs) { if (ignoreEarlierTs) {

View File

@ -76,7 +76,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
} }
SSyncCfg *pCfg = &info.config.syncCfg; SSyncCfg *pCfg = &info.config.syncCfg;
pCfg->replicaNum = 0; pCfg->replicaNum = 0;
pCfg->totalReplicaNum = 0; pCfg->totalReplicaNum = 0;
memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo)); memset(&pCfg->nodeInfo, 0, sizeof(pCfg->nodeInfo));
@ -109,7 +109,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex; pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
} }
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d",
pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex); pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex);
info.config.syncCfg = *pCfg; info.config.syncCfg = *pCfg;
@ -371,6 +371,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
}
// open tsdb // open tsdb
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) { if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));

View File

@ -341,13 +341,10 @@ int32_t ctgChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, SUserAuthInfo *pReq,
SCtgAuthReq req = {0}; SCtgAuthReq req = {0};
req.pRawReq = pReq; req.pRawReq = pReq;
req.pConn = pConn; req.pConn = pConn;
req.onlyCache = exists ? true : false; req.onlyCache = false;
CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pConn, pReq->user, &req.authInfo, NULL)); CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pConn, pReq->user, &req.authInfo, NULL));
CTG_ERR_JRET(ctgChkSetAuthRes(pCtg, &req, &rsp)); CTG_ERR_JRET(ctgChkSetAuthRes(pCtg, &req, &rsp));
if (rsp.metaNotExists && exists) {
*exists = false;
}
_return: _return:

View File

@ -1721,9 +1721,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
meta->tableType); meta->tableType);
if (pCache) { CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -926,7 +926,6 @@ int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList) {
} }
pIter = taosHashIterate(vgHash, pIter); pIter = taosHashIterate(vgHash, pIter);
vgInfo = NULL;
} }
*pList = vgList; *pList = vgList;

View File

@ -87,7 +87,7 @@ static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
return code; return code;
} }
static void setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, int32_t numOfRows, STableMeta* pMeta) { static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, int32_t numOfRows, STableMeta* pMeta) {
blockDataEnsureCapacity(pBlock, numOfRows); blockDataEnsureCapacity(pBlock, numOfRows);
pBlock->info.rows = 0; pBlock->info.rows = 0;
@ -114,6 +114,11 @@ static void setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, in
colDataSetVal(pCol4, pBlock->info.rows, buf, false); colDataSetVal(pCol4, pBlock->info.rows, buf, false);
++(pBlock->info.rows); ++(pBlock->info.rows);
} }
if (pBlock->info.rows <= 0) {
qError("no permission to view any columns");
return TSDB_CODE_PAR_PERMISSION_DENIED;
}
return TSDB_CODE_SUCCESS;
} }
static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** pRsp) { static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** pRsp) {
@ -123,7 +128,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp**
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
int32_t code = buildDescResultDataBlock(&pBlock); int32_t code = buildDescResultDataBlock(&pBlock);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta); code = setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS, pRsp); code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS, pRsp);

View File

@ -29,6 +29,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h" #include "trpc.h"
#include "tmisce.h" #include "tmisce.h"
#include "tversion.h"
// clang-format on // clang-format on
#define UDFD_MAX_SCRIPT_PLUGINS 64 #define UDFD_MAX_SCRIPT_PLUGINS 64
@ -1038,7 +1039,7 @@ int32_t udfdOpenClientRpc() {
connLimitNum = TMIN(connLimitNum, 500); connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
global.clientRpc = rpcOpen(&rpcInit); global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) { if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client"); fnError("failed to init dnode rpc client");

View File

@ -6044,6 +6044,9 @@ static int32_t checkCollectTopicTags(STranslateContext* pCxt, SCreateTopicStmt*
// for (int32_t i = 0; i < pMeta->tableInfo.numOfColumns; ++i) { // for (int32_t i = 0; i < pMeta->tableInfo.numOfColumns; ++i) {
SSchema* column = &pMeta->schema[0]; SSchema* column = &pMeta->schema[0];
SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* col = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == col) {
return TSDB_CODE_OUT_OF_MEMORY;
}
strcpy(col->colName, column->name); strcpy(col->colName, column->name);
strcpy(col->node.aliasName, col->colName); strcpy(col->node.aliasName, col->colName);
strcpy(col->node.userAlias, col->colName); strcpy(col->node.userAlias, col->colName);
@ -6154,7 +6157,7 @@ static int32_t translateAlterLocal(STranslateContext* pCxt, SAlterLocalStmt* pSt
char* p = strchr(pStmt->config, ' '); char* p = strchr(pStmt->config, ' ');
if (NULL != p) { if (NULL != p) {
*p = 0; *p = 0;
strcpy(pStmt->value, p + 1); tstrncpy(pStmt->value, p + 1, sizeof(pStmt->value));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -135,6 +135,7 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break; break;
case JOB_TASK_STATUS_DROP: case JOB_TASK_STATUS_DROP:
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
break;
default: default:
SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus)); SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));

View File

@ -392,6 +392,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
// NEVER REACH HERE // NEVER REACH HERE
SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId); SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
} }
case TDMT_SCH_LINK_BROKEN: case TDMT_SCH_LINK_BROKEN:
SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode)); SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));

View File

@ -961,7 +961,6 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
localRsp->rsp.numOfPlans = 0; localRsp->rsp.numOfPlans = 0;
localRsp->rsp.subplanInfo = NULL; localRsp->rsp.subplanInfo = NULL;
pTask = NULL; pTask = NULL;
pJob = NULL;
} }
_return: _return:

View File

@ -403,6 +403,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// wait for the task to be ready to go // wait for the task to be ready to go
while (pTask->taskLevel == TASK_LEVEL__SOURCE) { while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus); int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status == TASK_STATUS__DROPPING) {
break;
}
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status); qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status);
taosMsleep(100); taosMsleep(100);

View File

@ -21,6 +21,7 @@
#include "tglobal.h" #include "tglobal.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "tversion.h"
bool gRaftDetailLog = false; bool gRaftDetailLog = false;
SSyncIO *gSyncIO = NULL; SSyncIO *gSyncIO = NULL;
@ -188,7 +189,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
rpcInit.idleTime = 100; rpcInit.idleTime = 100;
rpcInit.user = "sync-io"; rpcInit.user = "sync-io";
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
io->clientRpc = rpcOpen(&rpcInit); io->clientRpc = rpcOpen(&rpcInit);
if (io->clientRpc == NULL) { if (io->clientRpc == NULL) {
sError("failed to initialize RPC"); sError("failed to initialize RPC");
@ -209,7 +210,7 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
rpcInit.idleTime = 2 * 1500; rpcInit.idleTime = 2 * 1500;
rpcInit.parent = io; rpcInit.parent = io;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {
sError("failed to start RPC server"); sError("failed to start RPC server");
@ -470,11 +471,10 @@ static void syncIOTickPing(void *param, void *tmrId) {
taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer); taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer);
} }
void syncEntryDestory(SSyncRaftEntry* pEntry) {} void syncEntryDestory(SSyncRaftEntry *pEntry) {}
void syncUtilMsgNtoH(void *msg) {
void syncUtilMsgNtoH(void* msg) { SMsgHead *pHead = msg;
SMsgHead* pHead = msg;
pHead->contLen = ntohl(pHead->contLen); pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId); pHead->vgId = ntohl(pHead->vgId);
} }
@ -487,9 +487,9 @@ static inline bool syncUtilCanPrint(char c) {
} }
} }
char* syncUtilPrintBin(char* ptr, uint32_t len) { char *syncUtilPrintBin(char *ptr, uint32_t len) {
int64_t memLen = (int64_t)(len + 1); int64_t memLen = (int64_t)(len + 1);
char* s = taosMemoryMalloc(memLen); char *s = taosMemoryMalloc(memLen);
ASSERT(s != NULL); ASSERT(s != NULL);
memset(s, 0, len + 1); memset(s, 0, len + 1);
memcpy(s, ptr, len); memcpy(s, ptr, len);
@ -502,13 +502,13 @@ char* syncUtilPrintBin(char* ptr, uint32_t len) {
return s; return s;
} }
char* syncUtilPrintBin2(char* ptr, uint32_t len) { char *syncUtilPrintBin2(char *ptr, uint32_t len) {
uint32_t len2 = len * 4 + 1; uint32_t len2 = len * 4 + 1;
char* s = taosMemoryMalloc(len2); char *s = taosMemoryMalloc(len2);
ASSERT(s != NULL); ASSERT(s != NULL);
memset(s, 0, len2); memset(s, 0, len2);
char* p = s; char *p = s;
for (int32_t i = 0; i < len; ++i) { for (int32_t i = 0; i < len; ++i) {
int32_t n = sprintf(p, "%d,", ptr[i]); int32_t n = sprintf(p, "%d,", ptr[i]);
p += n; p += n;
@ -516,7 +516,7 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) {
return s; return s;
} }
void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) { void syncUtilU642Addr(uint64_t u64, char *host, int64_t len, uint16_t *port) {
uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF); uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF);
struct in_addr addr = {.s_addr = hostU32}; struct in_addr addr = {.s_addr = hostU32};
@ -524,7 +524,7 @@ void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) {
*port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16); *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16);
} }
uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { uint64_t syncUtilAddr2U64(const char *host, uint16_t port) {
uint32_t hostU32 = taosGetIpv4FromFqdn(host); uint32_t hostU32 = taosGetIpv4FromFqdn(host);
if (hostU32 == (uint32_t)-1) { if (hostU32 == (uint32_t)-1) {
sError("failed to resolve ipv4 addr, host:%s", host); sError("failed to resolve ipv4 addr, host:%s", host);

View File

@ -345,7 +345,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
} }
*ppKey = pTKey; *ppKey = pTKey;
*pkLen = cd.kLen; *pkLen = cd.kLen;
memcpy(*ppKey, cd.pKey, cd.kLen); memcpy(*ppKey, cd.pKey, (size_t)cd.kLen);
} }
if (ppVal) { if (ppVal) {
@ -357,7 +357,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
} }
*ppVal = pTVal; *ppVal = pTVal;
*vLen = cd.vLen; *vLen = cd.vLen;
memcpy(*ppVal, cd.pVal, cd.vLen); memcpy(*ppVal, cd.pVal, (size_t)cd.vLen);
} }
if (TDB_CELLDECODER_FREE_KEY(&cd)) { if (TDB_CELLDECODER_FREE_KEY(&cd)) {
@ -1793,7 +1793,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
*ppKey = pKey; *ppKey = pKey;
*kLen = cd.kLen; *kLen = cd.kLen;
memcpy(pKey, cd.pKey, cd.kLen); memcpy(pKey, cd.pKey, (size_t)cd.kLen);
if (ppVal) { if (ppVal) {
if (cd.vLen > 0) { if (cd.vLen > 0) {
@ -1852,7 +1852,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
*ppKey = pKey; *ppKey = pKey;
*kLen = cd.kLen; *kLen = cd.kLen;
memcpy(pKey, cd.pKey, cd.kLen); memcpy(pKey, cd.pKey, (size_t)cd.kLen);
if (ppVal) { if (ppVal) {
// TODO: vLen may be zero // TODO: vLen may be zero
@ -1864,7 +1864,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
*ppVal = pVal; *ppVal = pVal;
*vLen = cd.vLen; *vLen = cd.vLen;
memcpy(pVal, cd.pVal, cd.vLen); memcpy(pVal, cd.pVal, (size_t)cd.vLen);
} }
ret = tdbBtcMoveToPrev(pBtc); ret = tdbBtcMoveToPrev(pBtc);

View File

@ -62,7 +62,10 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i
} }
memset(pDb->pgrHash, 0, tsize); memset(pDb->pgrHash, 0, tsize);
taosMulModeMkDir(dbname, 0755); ret = taosMulModeMkDir(dbname, 0755);
if (ret < 0) {
return -1;
}
#ifdef USE_MAINDB #ifdef USE_MAINDB
// open main db // open main db

View File

@ -980,6 +980,7 @@ int tdbPagerRestoreJournals(SPager *pPager) {
jname[dirLen] = '/'; jname[dirLen] = '/';
sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId); sprintf(jname + dirLen + 1, TDB_MAINDB_NAME "-journal.%" PRId64, *pTxnId);
if (tdbPagerRestore(pPager, jname) < 0) { if (tdbPagerRestore(pPager, jname) < 0) {
taosArrayDestroy(pTxnList);
tdbCloseDir(&pDir); tdbCloseDir(&pDir);
tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname); tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), jname);

View File

@ -154,6 +154,7 @@ typedef struct {
#pragma pack(push, 1) #pragma pack(push, 1)
#define TRANS_VER 2
typedef struct { typedef struct {
char version : 4; // RPC version char version : 4; // RPC version
char comp : 2; // compression algorithm, 0:no compression 1:lz4 char comp : 2; // compression algorithm, 0:no compression 1:lz4
@ -166,6 +167,7 @@ typedef struct {
uint64_t timestamp; uint64_t timestamp;
char user[TSDB_UNI_LEN]; char user[TSDB_UNI_LEN];
int32_t compatibilityVer;
uint32_t magicNum; uint32_t magicNum;
STraceId traceId; STraceId traceId;
uint64_t ahandle; // ahandle assigned by client uint64_t ahandle; // ahandle assigned by client

View File

@ -46,10 +46,10 @@ typedef struct {
int8_t connType; int8_t connType;
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
int32_t compatibilityVer;
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not int8_t encryption; // encrypt or not
int32_t retryMinInterval; // retry init interval int32_t retryMinInterval; // retry init interval
int32_t retryStepFactor; // retry interval factor int32_t retryStepFactor; // retry interval factor
int32_t retryMaxInterval; // retry max interval int32_t retryMaxInterval; // retry max interval

View File

@ -50,6 +50,7 @@ void* rpcOpen(const SRpcInit* pInit) {
} }
pRpc->encryption = pInit->encryption; pRpc->encryption = pInit->encryption;
pRpc->compatibilityVer = pInit->compatibilityVer;
pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval
pRpc->retryStepFactor = pInit->retryStepFactor; pRpc->retryStepFactor = pInit->retryStepFactor;

View File

@ -391,6 +391,7 @@ void cliHandleResp(SCliConn* conn) {
transMsg.info.ahandle = NULL; transMsg.info.ahandle = NULL;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
transMsg.info.hasEpSet = pHead->hasEpSet; transMsg.info.hasEpSet = pHead->hasEpSet;
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
SCliMsg* pMsg = NULL; SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = NULL; STransConnCtx* pCtx = NULL;
@ -488,6 +489,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) {
transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code; transMsg.code = code == -1 ? (pConn->broken ? TSDB_CODE_RPC_BROKEN_LINK : TSDB_CODE_RPC_NETWORK_UNAVAIL) : code;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.info.ahandle = NULL; transMsg.info.ahandle = NULL;
transMsg.info.cliVer = pTransInst->compatibilityVer;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
@ -984,11 +986,10 @@ void cliSendBatch(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SCliBatch* pBatch = pConn->pBatch; SCliBatch* pBatch = pConn->pBatch;
SCliBatchList* pList = pBatch->pList; int32_t wLen = pBatch->wLen;
pList->connCnt += 1;
int32_t wLen = pBatch->wLen; pBatch->pList->connCnt += 1;
uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t));
int i = 0; int i = 0;
@ -1018,6 +1019,8 @@ void cliSendBatch(SCliConn* pConn) {
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->magicNum = htonl(TRANS_MAGIC_NUM);
pHead->version = TRANS_VER;
pHead->compatibilityVer = htonl(pTransInst->compatibilityVer);
} }
pHead->timestamp = taosHton64(taosGetTimestampUs()); pHead->timestamp = taosHton64(taosGetTimestampUs());
@ -1074,6 +1077,8 @@ void cliSend(SCliConn* pConn) {
memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user)); memcpy(pHead->user, pTransInst->user, strlen(pTransInst->user));
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->magicNum = htonl(TRANS_MAGIC_NUM);
pHead->version = TRANS_VER;
pHead->compatibilityVer = htonl(pTransInst->compatibilityVer);
} }
pHead->timestamp = taosHton64(taosGetTimestampUs()); pHead->timestamp = taosHton64(taosGetTimestampUs());
@ -1346,6 +1351,7 @@ static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
transMsg.info.ahandle = pMsg->ctx->ahandle; transMsg.info.ahandle = pMsg->ctx->ahandle;
transMsg.info.traceId = pMsg->msg.info.traceId; transMsg.info.traceId = pMsg->msg.info.traceId;
transMsg.info.hasEpSet = false; transMsg.info.hasEpSet = false;
transMsg.info.cliVer = pTransInst->compatibilityVer;
if (pCtx->pSem != NULL) { if (pCtx->pSem != NULL) {
if (pCtx->pRsp == NULL) { if (pCtx->pRsp == NULL) {
} else { } else {
@ -1527,6 +1533,9 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
// persist conn already release by server // persist conn already release by server
STransMsg resp; STransMsg resp;
cliBuildExceptResp(pMsg, &resp); cliBuildExceptResp(pMsg, &resp);
// refactorr later
resp.info.cliVer = pTransInst->compatibilityVer;
if (pMsg->type != Release) { if (pMsg->type != Release) {
pTransInst->cfp(pTransInst->parent, &resp, NULL); pTransInst->cfp(pTransInst->parent, &resp, NULL);
} }
@ -1836,6 +1845,7 @@ void cliIteraConnMsgs(SCliConn* conn) {
if (-1 == cliBuildExceptResp(cmsg, &resp)) { if (-1 == cliBuildExceptResp(cmsg, &resp)) {
continue; continue;
} }
resp.info.cliVer = pTransInst->compatibilityVer;
pTransInst->cfp(pTransInst->parent, &resp, NULL); pTransInst->cfp(pTransInst->parent, &resp, NULL);
cmsg->ctx->ahandle = NULL; cmsg->ctx->ahandle = NULL;

View File

@ -192,7 +192,7 @@ bool transReadComplete(SConnBuffer* connBuf) {
memcpy((char*)&head, connBuf->buf, sizeof(head)); memcpy((char*)&head, connBuf->buf, sizeof(head));
int32_t msgLen = (int32_t)htonl(head.msgLen); int32_t msgLen = (int32_t)htonl(head.msgLen);
p->total = msgLen; p->total = msgLen;
p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)); p->invalid = TRANS_NOVALID_PACKET(htonl(head.magicNum)) || head.version != TRANS_VER;
} }
if (p->total >= p->len) { if (p->total >= p->len) {
p->left = p->total - p->len; p->left = p->total - p->len;

View File

@ -196,6 +196,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
return false; return false;
} }
tDebug("head version: %d 2", pHead->version);
pHead->code = htonl(pHead->code); pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen); pHead->msgLen = htonl(pHead->msgLen);
@ -236,8 +237,8 @@ static bool uvHandleReq(SSvrConn* pConn) {
if (pConn->status == ConnNormal && pHead->noResp == 0) { if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
if (cost >= EXCEPTION_LIMIT_US) { if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception", transLabel(pTransInst), tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus, recv exception",
pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
} else { } else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn, tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, cost:%dus", transLabel(pTransInst), pConn,
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost); TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, (int)cost);
@ -245,8 +246,8 @@ static bool uvHandleReq(SSvrConn* pConn) {
} else { } else {
if (cost >= EXCEPTION_LIMIT_US) { if (cost >= EXCEPTION_LIMIT_US) {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception", tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus, recv exception",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
transMsg.code, (int)(cost)); transMsg.code, (int)(cost));
} else { } else {
tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus", tGDebug("%s conn %p %s received from %s, local info:%s, len:%d, noResp:%d, code:%d, cost:%dus",
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp, transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, msgLen, pHead->noResp,
@ -262,6 +263,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId);
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
pConn->refId); pConn->refId);
@ -410,6 +412,8 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
pHead->traceId = pMsg->info.traceId; pHead->traceId = pMsg->info.traceId;
pHead->hasEpSet = pMsg->info.hasEpSet; pHead->hasEpSet = pMsg->info.hasEpSet;
pHead->magicNum = htonl(TRANS_MAGIC_NUM); pHead->magicNum = htonl(TRANS_MAGIC_NUM);
pHead->compatibilityVer = htonl(((STrans*)pConn->pTransInst)->compatibilityVer);
pHead->version = TRANS_VER;
// handle invalid drop_task resp, TD-20098 // handle invalid drop_task resp, TD-20098
if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {

View File

@ -19,6 +19,7 @@
#include "transLog.h" #include "transLog.h"
#include "trpc.h" #include "trpc.h"
#include "tutil.h" #include "tutil.h"
#include "tversion.h"
typedef struct { typedef struct {
int index; int index;
@ -155,7 +156,7 @@ int main(int argc, char *argv[]) {
} }
initLogEnv(); initLogEnv();
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {
tError("failed to initialize RPC"); tError("failed to initialize RPC");

View File

@ -13,12 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
//#define _DEFAULT_SOURCE // #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tglobal.h" #include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "transLog.h" #include "transLog.h"
#include "trpc.h" #include "trpc.h"
#include "tversion.h"
int msgSize = 128; int msgSize = 128;
int commit = 0; int commit = 0;
@ -151,6 +152,8 @@ int main(int argc, char *argv[]) {
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processRequestMsg; rpcInit.cfp = processRequestMsg;
rpcInit.idleTime = 2 * 1500; rpcInit.idleTime = 2 * 1500;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
rpcDebugFlag = 131; rpcDebugFlag = 131;
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
@ -187,7 +190,7 @@ int main(int argc, char *argv[]) {
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
initLogEnv(); initLogEnv();
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
void *pRpc = rpcOpen(&rpcInit); void *pRpc = rpcOpen(&rpcInit);
if (pRpc == NULL) { if (pRpc == NULL) {
tError("failed to start RPC server"); tError("failed to start RPC server");

View File

@ -18,10 +18,10 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
#include "tlog.h" #include "tlog.h"
#include "tmisce.h"
#include "transLog.h" #include "transLog.h"
#include "trpc.h" #include "trpc.h"
#include "tmisce.h" #include "tversion.h"
using namespace std; using namespace std;
const char *label = "APP"; const char *label = "APP";
@ -54,6 +54,8 @@ class Client {
rpcInit_.user = (char *)user; rpcInit_.user = (char *)user;
rpcInit_.parent = this; rpcInit_.parent = this;
rpcInit_.connType = TAOS_CONN_CLIENT; rpcInit_.connType = TAOS_CONN_CLIENT;
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
this->transCli = rpcOpen(&rpcInit_); this->transCli = rpcOpen(&rpcInit_);
tsem_init(&this->sem, 0, 0); tsem_init(&this->sem, 0, 0);
} }
@ -66,6 +68,7 @@ class Client {
void Restart(CB cb) { void Restart(CB cb) {
rpcClose(this->transCli); rpcClose(this->transCli);
rpcInit_.cfp = cb; rpcInit_.cfp = cb;
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
this->transCli = rpcOpen(&rpcInit_); this->transCli = rpcOpen(&rpcInit_);
} }
void Stop() { void Stop() {
@ -117,6 +120,7 @@ class Server {
rpcInit_.cfp = processReq; rpcInit_.cfp = processReq;
rpcInit_.user = (char *)user; rpcInit_.user = (char *)user;
rpcInit_.connType = TAOS_CONN_SERVER; rpcInit_.connType = TAOS_CONN_SERVER;
taosVersionStrToInt(version, &(rpcInit_.compatibilityVer));
} }
void Start() { void Start() {
this->transSrv = rpcOpen(&this->rpcInit_); this->transSrv = rpcOpen(&this->rpcInit_);

View File

@ -152,6 +152,7 @@ class TDTestCase:
os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql") os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql")
cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;" cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;"
tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}")
if os.system(cmd) == 0: if os.system(cmd) == 0:
raise Exception("failed to execute system command. cmd: %s" % cmd) raise Exception("failed to execute system command. cmd: %s" % cmd)

View File

@ -256,6 +256,7 @@ class TDTestCase:
tdLog.info("all consumers status into 'lost'") tdLog.info("all consumers status into 'lost'")
# drop consumer groups # drop consumer groups
tdLog.info("drop all consumers")
for i in range(len(groupIdList)): for i in range(len(groupIdList)):
for j in range(len(topicNameList)): for j in range(len(topicNameList)):
sqlCmd = f"drop consumer group `%s` on %s"%(groupIdList[i], topicNameList[j]) sqlCmd = f"drop consumer group `%s` on %s"%(groupIdList[i], topicNameList[j])
@ -265,7 +266,17 @@ class TDTestCase:
tmqCom.g_end_insert_flag = 1 tmqCom.g_end_insert_flag = 1
tdLog.debug("notify sub-thread to stop insert data") tdLog.debug("notify sub-thread to stop insert data")
pThread.join() pThread.join()
tdSql.query('show consumers;')
consumerNUm = tdSql.queryRows
tdSql.query('show subscriptions;')
subscribeNum = tdSql.queryRows
if (0 != consumerNUm or 0 != subscribeNum):
tdLog.exit("drop consumer fail! consumerNUm %d, subscribeNum: %d"%(consumerNUm, subscribeNum))
tdLog.info("drop consuer success, there is no consumers and subscribes")
tdLog.printNoPrefix("======== test case 1 end ...... ") tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self): def run(self):

View File

@ -15,6 +15,7 @@
#define _GNU_SOURCE #define _GNU_SOURCE
#include "shellInt.h" #include "shellInt.h"
#include "tversion.h"
static void shellWorkAsClient() { static void shellWorkAsClient() {
SShellArgs *pArgs = &shell.args; SShellArgs *pArgs = &shell.args;
@ -33,6 +34,7 @@ static void shellWorkAsClient() {
rpcInit.user = "_dnd"; rpcInit.user = "_dnd";
rpcInit.timeToGetConn = tsTimeToGetAvailableConn; rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
clientRpc = rpcOpen(&rpcInit); clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) { if (clientRpc == NULL) {
printf("failed to init net test client since %s\r\n", terrstr()); printf("failed to init net test client since %s\r\n", terrstr());
@ -123,6 +125,8 @@ static void shellWorkAsServer() {
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
void *serverRpc = rpcOpen(&rpcInit); void *serverRpc = rpcOpen(&rpcInit);
if (serverRpc == NULL) { if (serverRpc == NULL) {
printf("failed to init net test server since %s\r\n", terrstr()); printf("failed to init net test server since %s\r\n", terrstr());