Merge branch '3.0' into enh/opt-transport
This commit is contained in:
commit
b8e782e55f
|
@ -132,7 +132,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
|
|||
pip3 install taospy[ws]
|
||||
```
|
||||
|
||||
- **安装验证**
|
||||
- **安装验证**
|
||||
<Tabs defaultValue="rest">
|
||||
<TabItem value="native" label="原生连接">
|
||||
对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 `taos` 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入:
|
||||
|
@ -198,18 +198,18 @@ taos = { version = "*", default-features = false, features = ["ws"] }
|
|||
|
||||
- **安装**
|
||||
- 使用 npm 安装 Node.js 连接器
|
||||
```
|
||||
npm install @tdengine/websocket
|
||||
```
|
||||
```
|
||||
npm install @tdengine/websocket
|
||||
```
|
||||
:::note Node.js 目前只支持 Websocket 连接
|
||||
- **安装验证**
|
||||
- 新建安装验证目录,例如:`~/tdengine-test`,下载 GitHub 上 [nodejsChecker.js 源代码](https://github.com/taosdata/TDengine/tree/main/docs/examples/node/websocketexample/nodejsChecker.js)到本地。
|
||||
- 在命令行中执行以下命令。
|
||||
```bash
|
||||
npm init -y
|
||||
npm install @tdengine/websocket
|
||||
node nodejsChecker.js
|
||||
```
|
||||
```bash
|
||||
npm init -y
|
||||
npm install @tdengine/websocket
|
||||
node nodejsChecker.js
|
||||
```
|
||||
- 执行以上步骤后,在命令行会输出 nodeChecker.js 连接 TDengine 实例,并执行简单插入和查询的结果。
|
||||
|
||||
|
||||
|
|
|
@ -118,13 +118,14 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name][?tz=timez
|
|||
|
||||
### HTTP 响应码
|
||||
|
||||
从 `TDengine 3.0.3.0` 开始 `taosAdapter` 提供配置参数 `httpCodeServerError` 用来设置当 C 接口返回错误时是否返回非 200 的http状态码
|
||||
从 `TDengine 3.0.3.0` 开始 `taosAdapter` 提供配置参数 `httpCodeServerError` 用来设置当 C 接口返回错误时是否返回非 200 的http状态码。
|
||||
无论是否设置此参数,响应 body 里都有详细的错误码和错误信息,具体请参考 [错误](../rest-api/#错误) 。
|
||||
|
||||
| **说明** | **httpCodeServerError false** | **httpCodeServerError true** |
|
||||
|--------------------|-------------------------------|---------------------------------------|
|
||||
| taos_errno() 返回 0 | 200 | 200 |
|
||||
| taos_errno() 返回 非0 | 200(除鉴权错误) | 500 (除鉴权错误和 400/502 错误) |
|
||||
| 参数错误 | 400 (仅处理 HTTP 请求 URL 参数错误) | 400 (处理 HTTP 请求 URL 参数错误和 taosd 返回错误) |
|
||||
| taos_errno() 返回 非0 | 200(除鉴权错误) | 500 (除鉴权错误和 400/502/503 错误) |
|
||||
| 参数错误 | 400(仅处理 HTTP 请求 URL 参数错误) | 400 (处理 HTTP 请求 URL 参数错误和 taosd 返回错误) |
|
||||
| 鉴权错误 | 401 | 401 |
|
||||
| 接口不存在 | 404 | 404 |
|
||||
| 集群不可用错误 | 502 | 502 |
|
||||
|
@ -132,27 +133,29 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name][?tz=timez
|
|||
|
||||
返回 400 的 C 错误码为:
|
||||
|
||||
- TSDB_CODE_TSC_SQL_SYNTAX_ERROR ( 0x0216)
|
||||
- TSDB_CODE_TSC_LINE_SYNTAX_ERROR (0x021B)
|
||||
- TSDB_CODE_TSC_SQL_SYNTAX_ERROR (0x0216)
|
||||
- TSDB_CODE_TSC_LINE_SYNTAX_ERROR (0x021B)
|
||||
- TSDB_CODE_PAR_SYNTAX_ERROR (0x2600)
|
||||
- TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE (0x060B)
|
||||
- TSDB_CODE_TSC_VALUE_OUT_OF_RANGE (0x0224)
|
||||
- TSDB_CODE_TSC_VALUE_OUT_OF_RANGE (0x0224)
|
||||
- TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE (0x263B)
|
||||
|
||||
返回 401 的错误码为:
|
||||
|
||||
- TSDB_CODE_MND_USER_ALREADY_EXIST (0x0350)
|
||||
- TSDB_CODE_MND_USER_NOT_EXIST ( 0x0351)
|
||||
- TSDB_CODE_MND_INVALID_USER_FORMAT (0x0352)
|
||||
- TSDB_CODE_MND_INVALID_PASS_FORMAT (0x0353)
|
||||
- TSDB_CODE_MND_USER_ALREADY_EXIST (0x0350)
|
||||
- TSDB_CODE_MND_USER_NOT_EXIST (0x0351)
|
||||
- TSDB_CODE_MND_INVALID_USER_FORMAT (0x0352)
|
||||
- TSDB_CODE_MND_INVALID_PASS_FORMAT (0x0353)
|
||||
- TSDB_CODE_MND_NO_USER_FROM_CONN (0x0354)
|
||||
- TSDB_CODE_MND_TOO_MANY_USERS (0x0355)
|
||||
- TSDB_CODE_MND_INVALID_ALTER_OPER (0x0356)
|
||||
- TSDB_CODE_MND_AUTH_FAILURE (0x0357)
|
||||
|
||||
返回 403 的错误码为:
|
||||
返回 502 的错误码为:
|
||||
|
||||
- TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED (0x0020)
|
||||
- TSDB_CODE_RPC_NETWORK_UNAVAIL (0x000B)
|
||||
|
||||
错误码和错误描述请参考[错误码](../../../reference/error-code)
|
||||
|
||||
### HTTP body 结构
|
||||
|
||||
|
@ -333,6 +336,8 @@ curl --location 'http://<fqdn>:<port>/rest/sql' \
|
|||
- code:(`int`)错误码。
|
||||
- desc:(`string`)错误描述。
|
||||
|
||||
错误码和错误描述请参考[错误码](../../../reference/error-code)
|
||||
|
||||
#### 返回key-value形式数据
|
||||
|
||||
当指定 url 参数 `row_with_meta=true` 时,返回的 data 中的数据会从数组的形式变成对象的形式,对象的 key 为列名,value 为数据,如下所示:
|
||||
|
|
|
@ -494,6 +494,14 @@ typedef struct SScanWalInfo {
|
|||
tmr_h scanTimer;
|
||||
} SScanWalInfo;
|
||||
|
||||
typedef struct SFatalErrInfo {
|
||||
int32_t code;
|
||||
int64_t ts;
|
||||
int32_t threadId;
|
||||
int32_t line;
|
||||
char func[128];
|
||||
} SFatalErrInfo;
|
||||
|
||||
// meta
|
||||
typedef struct SStreamMeta {
|
||||
char* path;
|
||||
|
@ -523,14 +531,13 @@ typedef struct SStreamMeta {
|
|||
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
|
||||
int32_t numOfPausedTasks;
|
||||
int64_t rid;
|
||||
|
||||
int64_t chkpId;
|
||||
int32_t chkpCap;
|
||||
SArray* chkpSaved;
|
||||
SArray* chkpInUse;
|
||||
SRWLatch chkpDirLock;
|
||||
void* qHandle; // todo remove it
|
||||
void* bkdChkptMgt;
|
||||
SFatalErrInfo fatalInfo; // fatal error occurs, stream stop to execute
|
||||
int64_t chkpId;
|
||||
int32_t chkpCap;
|
||||
SArray* chkpSaved;
|
||||
SArray* chkpInUse;
|
||||
SRWLatch chkpDirLock;
|
||||
void* bkdChkptMgt;
|
||||
} SStreamMeta;
|
||||
|
||||
typedef struct STaskUpdateEntry {
|
||||
|
@ -776,6 +783,9 @@ void streamMetaRLock(SStreamMeta* pMeta);
|
|||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||
void streamMetaWLock(SStreamMeta* pMeta);
|
||||
void streamMetaWUnLock(SStreamMeta* pMeta);
|
||||
void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino);
|
||||
int32_t streamGetFatalError(const SStreamMeta* pMeta);
|
||||
|
||||
void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId);
|
||||
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pTaskList);
|
||||
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
|
||||
|
@ -791,7 +801,7 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts);
|
|||
|
||||
// timer
|
||||
int32_t streamTimerGetInstance(tmr_h* pTmr);
|
||||
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
|
||||
void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId,
|
||||
const char* pMsg);
|
||||
void streamTmrStop(tmr_h tmrId);
|
||||
|
||||
|
|
|
@ -166,7 +166,6 @@ int32_t taosGetFqdn(char *);
|
|||
void tinet_ntoa(char *ipstr, uint32_t ip);
|
||||
uint32_t ip2uint(const char *const ip_addr);
|
||||
int32_t taosIgnSIGPIPE();
|
||||
uint32_t taosInetAddr(const char *ipAddr);
|
||||
const char *taosInetNtoa(struct in_addr ipInt, char *dstStr, int32_t len);
|
||||
|
||||
uint64_t taosHton64(uint64_t val);
|
||||
|
|
|
@ -80,7 +80,7 @@ typedef struct {
|
|||
|
||||
SysNameInfo taosGetSysNameInfo();
|
||||
bool taosCheckCurrentInDll();
|
||||
int taosGetlocalhostname(char *hostname, size_t maxLen);
|
||||
int32_t taosGetlocalhostname(char *hostname, size_t maxLen);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -81,8 +81,11 @@ int32_t taosResetTerminalMode();
|
|||
unw_get_reg(&cursor, UNW_REG_IP, &pc); \
|
||||
fname[0] = '\0'; \
|
||||
(void)unw_get_proc_name(&cursor, fname, sizeof(fname), &offset); \
|
||||
size += 1; \
|
||||
array[size] = (char *)taosMemoryMalloc(sizeof(char) * STACKSIZE + 1); \
|
||||
if(NULL == array[size]) { \
|
||||
break; \
|
||||
} \
|
||||
size += 1; \
|
||||
snprintf(array[size], STACKSIZE, "0x%lx : (%s+0x%lx) [0x%lx]\n", (long)pc, fname, (long)offset, (long)pc); \
|
||||
} \
|
||||
if (ignoreNum < size && size > 0) { \
|
||||
|
|
|
@ -2043,7 +2043,7 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
|
|||
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
|
||||
int32_t idx = -1;
|
||||
iconv_t conv = taosAcquireConv(&idx, C2M);
|
||||
if (!conv) return TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
int32_t type = pResultInfo->fields[i].type;
|
||||
|
@ -2240,6 +2240,10 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
|||
} else if (tTagIsJson(data)) {
|
||||
char* jsonString = NULL;
|
||||
parseTagDatatoJson(data, &jsonString);
|
||||
if(jsonString == NULL) {
|
||||
tscError("doConvertJson error: parseTagDatatoJson failed");
|
||||
return terrno;
|
||||
}
|
||||
STR_TO_VARSTR(dst, jsonString);
|
||||
taosMemoryFree(jsonString);
|
||||
} else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
|
||||
|
|
|
@ -417,6 +417,10 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
|
|||
}
|
||||
char* pJson = NULL;
|
||||
parseTagDatatoJson(pTag, &pJson);
|
||||
if(pJson == NULL) {
|
||||
uError("parseTagDatatoJson failed, pJson == NULL");
|
||||
goto end;
|
||||
}
|
||||
cJSON* tag = cJSON_CreateObject();
|
||||
RAW_NULL_CHECK(tag);
|
||||
STagVal* pTagVal = taosArrayGet(pTagVals, 0);
|
||||
|
@ -727,6 +731,10 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
|
|||
goto end;
|
||||
}
|
||||
parseTagDatatoJson(vAlterTbReq.pTagVal, &buf);
|
||||
if(buf == NULL) {
|
||||
uError("parseTagDatatoJson failed, buf == NULL");
|
||||
goto end;
|
||||
}
|
||||
} else {
|
||||
if (vAlterTbReq.tagType == TSDB_DATA_TYPE_VARBINARY) {
|
||||
buf = taosMemoryCalloc(vAlterTbReq.nTagVal * 2 + 2 + 3, 1);
|
||||
|
|
|
@ -194,6 +194,12 @@ static int32_t stmtUpdateBindInfo(TAOS_STMT2* stmt, STableMeta* pTableMeta, void
|
|||
pStmt->bInfo.tbSuid = pTableMeta->suid;
|
||||
pStmt->bInfo.tbVgId = pTableMeta->vgId;
|
||||
pStmt->bInfo.tbType = pTableMeta->tableType;
|
||||
|
||||
if (!pStmt->bInfo.tagsCached) {
|
||||
qDestroyBoundColInfo(pStmt->bInfo.boundTags);
|
||||
taosMemoryFreeClear(pStmt->bInfo.boundTags);
|
||||
}
|
||||
|
||||
pStmt->bInfo.boundTags = tags;
|
||||
pStmt->bInfo.tagsCached = false;
|
||||
tstrncpy(pStmt->bInfo.stbFName, sTableName, sizeof(pStmt->bInfo.stbFName));
|
||||
|
@ -985,10 +991,6 @@ int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pStmt->bInfo.inExecCache) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableDataCxt** pDataBlock =
|
||||
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
|
||||
if (NULL == pDataBlock) {
|
||||
|
@ -996,6 +998,10 @@ int stmtSetTbTags2(TAOS_STMT2* stmt, TAOS_STMT2_BIND* tags) {
|
|||
STMT_ERR_RET(TSDB_CODE_APP_ERROR);
|
||||
}
|
||||
|
||||
if (pStmt->bInfo.inExecCache && (!pStmt->sql.autoCreateTbl || (*pDataBlock)->pData->pCreateTbReq)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tscDebug("start to bind stmt tag values");
|
||||
STMT_ERR_RET(qBindStmtTagsValue2(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.tbSuid, pStmt->bInfo.stbFName,
|
||||
pStmt->bInfo.sname.tname, tags, pStmt->exec.pRequest->msgBuf,
|
||||
|
|
|
@ -1143,18 +1143,16 @@ int32_t extractStreamNodeList(SMnode *pMnode) {
|
|||
return taosArrayGetSize(execInfo.pNodeList);
|
||||
}
|
||||
|
||||
static bool taskNodeIsUpdated(SMnode *pMnode) {
|
||||
bool allReady = true;
|
||||
SArray *pNodeSnapshot = NULL;
|
||||
|
||||
// check if the node update happens or not
|
||||
streamMutexLock(&execInfo.lock);
|
||||
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
|
||||
bool allReady = false;
|
||||
bool nodeUpdated = false;
|
||||
SVgroupChangeInfo changeInfo = {0};
|
||||
|
||||
int32_t numOfNodes = extractStreamNodeList(pMnode);
|
||||
|
||||
if (numOfNodes == 0) {
|
||||
mDebug("stream task node change checking done, no vgroups exist, do nothing");
|
||||
execInfo.ts = taosGetTimestampSec();
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1166,43 +1164,46 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
|
|||
|
||||
if (pNodeEntry->stageUpdated) {
|
||||
mDebug("stream task not ready due to node update detected, checkpoint not issued");
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
|
||||
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
|
||||
if (code) {
|
||||
mError("failed to get the vgroup snapshot, ignore it and continue");
|
||||
}
|
||||
|
||||
if (!allReady) {
|
||||
mWarn("not all vnodes ready, quit from vnodes status check");
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return true;
|
||||
}
|
||||
|
||||
SVgroupChangeInfo changeInfo = {0};
|
||||
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
|
||||
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
|
||||
if (code) {
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return false;
|
||||
nodeUpdated = false;
|
||||
} else {
|
||||
nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
||||
if (nodeUpdated) {
|
||||
mDebug("stream tasks not ready due to node update");
|
||||
}
|
||||
}
|
||||
|
||||
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
||||
|
||||
mndDestroyVgroupChangeInfo(&changeInfo);
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
|
||||
if (nodeUpdated) {
|
||||
mDebug("stream tasks not ready due to node update");
|
||||
}
|
||||
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return nodeUpdated;
|
||||
}
|
||||
|
||||
// check if the node update happens or not
|
||||
static bool taskNodeIsUpdated(SMnode *pMnode) {
|
||||
SArray *pNodeSnapshot = NULL;
|
||||
|
||||
streamMutexLock(&execInfo.lock);
|
||||
bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
return updated;
|
||||
}
|
||||
|
||||
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||
bool ready = true;
|
||||
if (taskNodeIsUpdated(pMnode)) {
|
||||
|
@ -1993,7 +1994,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis
|
|||
|
||||
if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
|
||||
mndDestroyVgroupChangeInfo(pInfo);
|
||||
return terrno;
|
||||
TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
|
||||
}
|
||||
|
||||
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
|
||||
|
@ -2048,6 +2049,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis
|
|||
return code;
|
||||
|
||||
_err:
|
||||
mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
|
||||
mndDestroyVgroupChangeInfo(pInfo);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -342,7 +342,8 @@ int32_t mndUpdateIpWhiteImpl(SHashObj *pIpWhiteTab, char *user, char *fqdn, int8
|
|||
SIpV4Range range = {.ip = 0, .mask = 32};
|
||||
int32_t code = taosGetIpv4FromFqdn(fqdn, &range.ip);
|
||||
if (code) {
|
||||
//TODO
|
||||
mError("failed to get ip from fqdn: %s at line %d since %s", fqdn, lino, tstrerror(code));
|
||||
TAOS_RETURN(TSDB_CODE_TSC_INVALID_FQDN);
|
||||
}
|
||||
mDebug("ip-white-list may update for user: %s, fqdn: %s", user, fqdn);
|
||||
SIpWhiteList **ppList = taosHashGet(pIpWhiteTab, user, strlen(user));
|
||||
|
|
|
@ -160,6 +160,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
int32_t gError = streamGetFatalError(pMeta);
|
||||
if (gError != 0) {
|
||||
tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
|
||||
pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// update the nodeEpset when it exists
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
|
@ -290,8 +297,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||
updateTasks, (numOfTasks - updateTasks));
|
||||
} else {
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
// persist to disk
|
||||
if ((code = streamMetaCommit(pMeta)) < 0) {
|
||||
// always return true
|
||||
streamMetaWUnLock(pMeta);
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
streamMetaClearSetUpdateTaskListComplete(pMeta);
|
||||
|
@ -754,8 +764,9 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored
|
|||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// always return success when handling the requirement issued by mnode during transaction.
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||
|
@ -1197,10 +1208,6 @@ int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { ret
|
|||
|
||||
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
|
||||
|
||||
int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
return doProcessDummyRspMsg(pMeta, pMsg);
|
||||
}
|
||||
|
||||
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
|
||||
|
||||
|
@ -1221,14 +1228,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
SStreamTask* pTask = NULL;
|
||||
SRestoreCheckpointInfo req = {0};
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
int64_t now = taosGetTimestampMs();
|
||||
SDecoder decoder;
|
||||
SRestoreCheckpointInfo req = {0};
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
|
||||
if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) {
|
||||
tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -1239,16 +1245,15 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
|
||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||
if (pTask == NULL || (code != 0)) {
|
||||
tqError(
|
||||
"vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
|
||||
pMeta->vgId, req.taskId);
|
||||
tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
|
||||
pMeta->vgId, req.taskId);
|
||||
// ignore this code to avoid error code over write
|
||||
int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||
if (ret) {
|
||||
tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
|
||||
}
|
||||
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// discard the rsp, since it is expired.
|
||||
|
@ -1272,7 +1277,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_STREAM_INTERNAL_ERROR;
|
||||
return 0;
|
||||
}
|
||||
|
||||
SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
|
||||
|
@ -1299,10 +1304,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
|
||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||
code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
|
||||
if (code) {
|
||||
tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
return 0;
|
||||
}
|
|
@ -243,7 +243,7 @@ static int32_t setTableSchema(SCacheRowsReader* p, uint64_t suid, const char* id
|
|||
code = metaGetTbTSchemaNotNull(p->pVnode->pMeta, suid, -1, 1, &p->pSchema);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
tsdbWarn("stable:%" PRIu64 " has been dropped, failed to retrieve cached rows, %s", suid, idstr);
|
||||
if(code != TSDB_CODE_OUT_OF_MEMORY) {
|
||||
if(code == TSDB_CODE_NOT_FOUND) {
|
||||
return TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
||||
} else {
|
||||
return code;
|
||||
|
|
|
@ -448,6 +448,10 @@ int32_t ctgGetTbTag(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName,
|
|||
|
||||
char* pJson = NULL;
|
||||
parseTagDatatoJson(pTag, &pJson);
|
||||
if(NULL == pJson) {
|
||||
taosArrayDestroy(pTagVals);
|
||||
CTG_ERR_JRET(terrno);
|
||||
}
|
||||
STagVal tagVal;
|
||||
tagVal.cid = 0;
|
||||
tagVal.type = TSDB_DATA_TYPE_JSON;
|
||||
|
|
|
@ -2093,6 +2093,10 @@ int32_t ctgHandleGetTbTagRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
|
|||
|
||||
char* pJson = NULL;
|
||||
parseTagDatatoJson(pTag, &pJson);
|
||||
if (NULL == pJson) {
|
||||
taosArrayDestroy(pTagVals);
|
||||
CTG_ERR_JRET(terrno);
|
||||
}
|
||||
STagVal tagVal;
|
||||
tagVal.cid = 0;
|
||||
tagVal.type = TSDB_DATA_TYPE_JSON;
|
||||
|
|
|
@ -3710,6 +3710,9 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg
|
|||
}
|
||||
|
||||
*pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo));
|
||||
if (NULL == *pVgroup) {
|
||||
CTG_ERR_JRET(terrno);
|
||||
}
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, NULL, dbCache->vgCache.vgInfo, pTableName, *pVgroup));
|
||||
|
||||
_return:
|
||||
|
|
|
@ -291,6 +291,10 @@ int32_t ctgdHandleDbgCommand(char *command) {
|
|||
}
|
||||
|
||||
char *dup = taosStrdup(command);
|
||||
if (NULL == dup) {
|
||||
CTG_RET(terrno);
|
||||
}
|
||||
|
||||
char *option = NULL;
|
||||
char *param = NULL;
|
||||
|
||||
|
|
|
@ -565,6 +565,10 @@ int32_t appendTagValues(char* buf, int32_t* len, STableCfg* pCfg) {
|
|||
if (tTagIsJson(pTag)) {
|
||||
char* pJson = NULL;
|
||||
parseTagDatatoJson(pTag, &pJson);
|
||||
if(NULL == pJson) {
|
||||
qError("failed to parse tag to json, pJson is NULL");
|
||||
return terrno;
|
||||
}
|
||||
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s", pJson);
|
||||
taosMemoryFree(pJson);
|
||||
|
||||
|
|
|
@ -149,6 +149,9 @@ int32_t mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool*
|
|||
len = varDataTLen(p1);
|
||||
}
|
||||
char* p2 = taosMemoryMalloc(len);
|
||||
if (NULL == p2) {
|
||||
MJ_ERR_RET(terrno);
|
||||
}
|
||||
TAOS_MEMCPY(p2, p1, len);
|
||||
code = colDataSetVal(pDst, numOfRows, p2, false);
|
||||
if (code) {
|
||||
|
|
|
@ -1114,6 +1114,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
|
|||
if (tagType == TSDB_DATA_TYPE_JSON) {
|
||||
char* tagJson = NULL;
|
||||
parseTagDatatoJson(tagData, &tagJson);
|
||||
if (tagJson == NULL) {
|
||||
code = terrno;
|
||||
goto _end;
|
||||
}
|
||||
tagVarChar = taosMemoryMalloc(strlen(tagJson) + VARSTR_HEADER_SIZE);
|
||||
QUERY_CHECK_NULL(tagVarChar, code, lino, _end, terrno);
|
||||
memcpy(varDataVal(tagVarChar), tagJson, strlen(tagJson));
|
||||
|
|
|
@ -900,6 +900,9 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
|
|||
udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
|
||||
int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
|
||||
udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
|
||||
if (NULL == udfCol->colData.fixLenCol.data) {
|
||||
return terrno;
|
||||
}
|
||||
char *data = udfCol->colData.fixLenCol.data;
|
||||
memcpy(data, col->pData, dataLen);
|
||||
}
|
||||
|
|
|
@ -85,13 +85,19 @@ int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfNa
|
|||
char mergeFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
|
||||
char *mergeSuffix = "_merge";
|
||||
snprintf(mergeFuncName, sizeof(mergeFuncName), "%s%s", processFuncName, mergeSuffix);
|
||||
(void)(uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)));
|
||||
int ret = uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc));
|
||||
if (ret != 0) {
|
||||
fnInfo("uv_dlsym function %s. error: %s", mergeFuncName, uv_strerror(ret));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
|
||||
int32_t err = 0;
|
||||
SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx));
|
||||
if (NULL == udfCtx) {
|
||||
return terrno;
|
||||
}
|
||||
err = uv_dlopen(udf->path, &udfCtx->lib);
|
||||
if (err != 0) {
|
||||
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
|
||||
|
@ -606,6 +612,9 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) {
|
|||
|
||||
int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) {
|
||||
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
||||
if (NULL == udfNew) {
|
||||
return terrno;
|
||||
}
|
||||
udfNew->refCount = 1;
|
||||
udfNew->lastFetchTime = taosGetTimestampMs();
|
||||
strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
|
||||
|
@ -1105,6 +1114,9 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
|
|||
taosArrayDestroy(retrieveReq.pFuncNames);
|
||||
|
||||
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
|
||||
if(NULL == msgInfo) {
|
||||
return terrno;
|
||||
}
|
||||
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
|
||||
msgInfo->param = udf;
|
||||
if(uv_sem_init(&msgInfo->resultSem, 0) != 0) {
|
||||
|
|
|
@ -892,11 +892,11 @@ int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TA
|
|||
if (NULL == tags) {
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
/*
|
||||
if (pDataBlock->pMeta->tableType != TSDB_SUPER_TABLE && pDataBlock->pMeta->tableType != TSDB_CHILD_TABLE) {
|
||||
return TSDB_CODE_TSC_STMT_API_ERROR;
|
||||
}
|
||||
|
||||
*/
|
||||
SSchema* pSchema = getTableTagSchema(pDataBlock->pMeta);
|
||||
if (tags->numOfBound <= 0) {
|
||||
*fieldNum = 0;
|
||||
|
|
|
@ -514,6 +514,9 @@ end:
|
|||
taosArrayDestroy(pTagVals);
|
||||
if (string == NULL) {
|
||||
string = taosStrdup(TSDB_DATA_NULL_STR_L);
|
||||
if(string == NULL) {
|
||||
qError("failed to strdup null string");
|
||||
}
|
||||
}
|
||||
*jsonStr = string;
|
||||
}
|
||||
|
@ -629,6 +632,7 @@ int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
|
|||
(*pDst)->flags = pSrc->flags;
|
||||
if (pSrc->name) {
|
||||
(*pDst)->name = taosStrdup(pSrc->name);
|
||||
if (NULL == (*pDst)->name) goto _exit;
|
||||
}
|
||||
(*pDst)->uid = pSrc->uid;
|
||||
(*pDst)->btime = pSrc->btime;
|
||||
|
@ -636,21 +640,25 @@ int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
|
|||
(*pDst)->commentLen = pSrc->commentLen;
|
||||
if (pSrc->comment) {
|
||||
(*pDst)->comment = taosStrdup(pSrc->comment);
|
||||
if (NULL == (*pDst)->comment) goto _exit;
|
||||
}
|
||||
(*pDst)->type = pSrc->type;
|
||||
|
||||
if (pSrc->type == TSDB_CHILD_TABLE) {
|
||||
if (pSrc->ctb.stbName) {
|
||||
(*pDst)->ctb.stbName = taosStrdup(pSrc->ctb.stbName);
|
||||
if (NULL == (*pDst)->ctb.stbName) goto _exit;
|
||||
}
|
||||
(*pDst)->ctb.tagNum = pSrc->ctb.tagNum;
|
||||
(*pDst)->ctb.suid = pSrc->ctb.suid;
|
||||
if (pSrc->ctb.tagName) {
|
||||
(*pDst)->ctb.tagName = taosArrayDup(pSrc->ctb.tagName, NULL);
|
||||
if (NULL == (*pDst)->ctb.tagName) goto _exit;
|
||||
}
|
||||
STag* pTag = (STag*)pSrc->ctb.pTag;
|
||||
if (pTag) {
|
||||
(*pDst)->ctb.pTag = taosMemoryMalloc(pTag->len);
|
||||
if(NULL == (*pDst)->ctb.pTag) goto _exit;
|
||||
memcpy((*pDst)->ctb.pTag, pTag, pTag->len);
|
||||
}
|
||||
} else {
|
||||
|
@ -658,11 +666,17 @@ int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
|
|||
(*pDst)->ntb.schemaRow.version = pSrc->ntb.schemaRow.nCols;
|
||||
if (pSrc->ntb.schemaRow.nCols > 0 && pSrc->ntb.schemaRow.pSchema) {
|
||||
(*pDst)->ntb.schemaRow.pSchema = taosMemoryMalloc(pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
|
||||
if (NULL == (*pDst)->ntb.schemaRow.pSchema) goto _exit;
|
||||
memcpy((*pDst)->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.pSchema, pSrc->ntb.schemaRow.nCols * sizeof(SSchema));
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_exit:
|
||||
tdDestroySVCreateTbReq(*pDst);
|
||||
taosMemoryFree(*pDst);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
void freeDbCfgInfo(SDbCfgInfo* pInfo) {
|
||||
|
|
|
@ -85,6 +85,9 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3
|
|||
|
||||
int32_t bufLen = tSerializeSTableInfoReq(NULL, 0, &infoReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSTableInfoReq(pBuf, bufLen, &infoReq) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -112,6 +115,9 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
|
|||
|
||||
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSUseDbReq(pBuf, bufLen, &usedbReq) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -133,6 +139,9 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
|
|||
|
||||
int32_t bufLen = tSerializeSQnodeListReq(NULL, 0, &qnodeListReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSQnodeListReq(pBuf, bufLen, &qnodeListReq) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -154,6 +163,9 @@ int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
|
|||
|
||||
int32_t bufLen = tSerializeSDnodeListReq(NULL, 0, &dnodeListReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSDnodeListReq(pBuf, bufLen, &dnodeListReq) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -174,6 +186,9 @@ int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t
|
|||
|
||||
int32_t bufLen = tSerializeSServerVerReq(NULL, 0, &req);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSServerVerReq(pBuf, bufLen, &req) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -195,6 +210,9 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
|
|||
|
||||
int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -216,6 +234,9 @@ int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t
|
|||
|
||||
int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSUserIndexReq(pBuf, bufLen, &indexReq) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -247,8 +268,13 @@ int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int3
|
|||
|
||||
int32_t bufLen = tSerializeSRetrieveFuncReq(NULL, 0, &funcReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
taosArrayDestroy(funcReq.pFuncNames);
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSRetrieveFuncReq(pBuf, bufLen, &funcReq) < 0)
|
||||
{
|
||||
taosArrayDestroy(funcReq.pFuncNames);
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
|
@ -270,6 +296,9 @@ int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32
|
|||
|
||||
int32_t bufLen = tSerializeSGetUserAuthReq(NULL, 0, &req);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if (tSerializeSGetUserAuthReq(pBuf, bufLen, &req) < 0) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
@ -290,6 +319,9 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_
|
|||
|
||||
int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSTableIndexReq(pBuf, bufLen, &indexReq) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -314,6 +346,9 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
|
|||
|
||||
int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSTableCfgReq(pBuf, bufLen, &cfgReq) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -335,6 +370,9 @@ int32_t queryBuildGetViewMetaMsg(void *input, char **msg, int32_t msgSize, int32
|
|||
|
||||
int32_t bufLen = tSerializeSViewMetaReq(NULL, 0, &req);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeSViewMetaReq(pBuf, bufLen, &req) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -357,6 +395,9 @@ int32_t queryBuildGetTableTSMAMsg(void *input, char **msg, int32_t msgSize, int3
|
|||
|
||||
int32_t bufLen = tSerializeTableTSMAInfoReq(NULL, 0, &req);
|
||||
void * pBuf = (*mallcFp)(bufLen);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeTableTSMAInfoReq(pBuf, bufLen, &req) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -379,6 +420,10 @@ int32_t queryBuildGetTSMAMsg(void *input, char **msg, int32_t msgSize, int32_t *
|
|||
|
||||
int32_t bufLen = tSerializeTableTSMAInfoReq(NULL, 0, &req);
|
||||
void * pBuf = (*mallcFp)(bufLen);
|
||||
if(pBuf == NULL)
|
||||
{
|
||||
return terrno;
|
||||
}
|
||||
if(tSerializeTableTSMAInfoReq(pBuf, bufLen, &req) < 0)
|
||||
{
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
|
@ -396,6 +441,9 @@ int32_t queryBuildGetStreamProgressMsg(void* input, char** msg, int32_t msgSize,
|
|||
|
||||
int32_t len = tSerializeStreamProgressReq(NULL, 0, input);
|
||||
void* pBuf = (*mallcFp)(len);
|
||||
if (NULL == pBuf) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if(tSerializeStreamProgressReq(pBuf, len, input) < 0)
|
||||
{
|
||||
|
@ -666,6 +714,9 @@ int32_t queryProcessGetSerVerRsp(void *output, char *msg, int32_t msgSize) {
|
|||
}
|
||||
|
||||
*(char **)output = taosStrdup(out.ver);
|
||||
if (NULL == *(char **)output) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -267,6 +267,9 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
|||
if (pMsg->pData) {
|
||||
SDecoder coder = {0};
|
||||
SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp));
|
||||
if (NULL == rsp) {
|
||||
SCH_ERR_JRET(terrno);
|
||||
}
|
||||
tDecoderInit(&coder, pMsg->pData, msgSize);
|
||||
code = tDecodeSSubmitRsp2(&coder, rsp);
|
||||
tDecoderClear(&coder);
|
||||
|
@ -961,6 +964,9 @@ int32_t schUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, SQueryNodeAddr *addr
|
|||
pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
|
||||
pMsgSendInfo->target.vgId = addr->nodeId;
|
||||
pMsgSendInfo->target.dbFName = taosStrdup(pTask->plan->dbFName);
|
||||
if (NULL == pMsgSendInfo->target.dbFName) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -303,13 +303,8 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
|||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s start check-rsp monitor, ref:%d ", pTask->id.idStr, ref);
|
||||
|
||||
if (pInfo->checkRspTmr == NULL) {
|
||||
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer);
|
||||
} else {
|
||||
streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
|
||||
"check-status-monitor");
|
||||
}
|
||||
streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
|
||||
"check-status-monitor");
|
||||
|
||||
streamMutexUnlock(&pInfo->checkInfoLock);
|
||||
}
|
||||
|
@ -860,7 +855,7 @@ void rspMonitorFn(void* param, void* tmrId) {
|
|||
handleTimeoutDownstreamTasks(pTask, pTimeoutList);
|
||||
}
|
||||
|
||||
streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
|
||||
streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
|
||||
"check-status-monitor");
|
||||
streamMutexUnlock(&pInfo->checkInfoLock);
|
||||
|
||||
|
|
|
@ -209,29 +209,18 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
||||
if (pDataBlock == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int64_t checkpointId = pDataBlock->info.version;
|
||||
int32_t transId = pDataBlock->info.window.skey;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
|
||||
int32_t transId) {
|
||||
int32_t code = 0;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
if (pTask->chkInfo.checkpointId > checkpointId) {
|
||||
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
|
||||
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
|
||||
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -239,37 +228,33 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
|
||||
" discard the checkpoint-trigger block",
|
||||
id, vgId, checkpointId, transId, pActiveInfo->failedId);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pTask->chkInfo.checkpointId == checkpointId) {
|
||||
{ // send checkpoint-ready msg to upstream
|
||||
SRpcMsg msg = {0};
|
||||
SRpcMsg msg = {0};
|
||||
SStreamUpstreamEpInfo* pInfo = NULL;
|
||||
streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
|
||||
if (pInfo == NULL) {
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = tmsgSendReq(&pInfo->epSet, &msg);
|
||||
if (code) {
|
||||
stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stWarn(
|
||||
"s-task:%s vgId:%d recv already finished checkpoint msg, send checkpoint-ready to upstream:0x%x to resume the "
|
||||
"interrupted checkpoint",
|
||||
"s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume "
|
||||
"the interrupted checkpoint",
|
||||
id, vgId, pBlock->srcTaskId);
|
||||
|
||||
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -278,9 +263,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
|
||||
" discard",
|
||||
id, vgId, pActiveInfo->activeId, checkpointId);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
return code;
|
||||
} else { // checkpointId == pActiveInfo->activeId
|
||||
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
|
||||
|
@ -288,8 +270,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
|
||||
"checkpointId:%" PRId64 " transId:%d",
|
||||
id, vgId, checkpointId, transId);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -298,7 +278,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
|
||||
STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
|
||||
if (p == NULL) {
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
|
@ -306,9 +285,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
|
||||
", prev recvTs:%" PRId64 " discard",
|
||||
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -316,7 +292,33 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
int64_t checkpointId = 0;
|
||||
int32_t transId = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
|
||||
if (pDataBlock == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
checkpointId = pDataBlock->info.version;
|
||||
transId = pDataBlock->info.window.skey;
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
if (code) {
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
return code;
|
||||
}
|
||||
|
||||
stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
|
||||
", transId:%d current active checkpointId:%" PRId64,
|
||||
|
@ -345,12 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
|
||||
streamMetaAcquireOneTask(pTask);
|
||||
|
||||
if (pTmrInfo->tmrHandle == NULL) {
|
||||
pTmrInfo->tmrHandle = taosTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer);
|
||||
} else {
|
||||
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
|
||||
}
|
||||
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"trigger-recv-monitor");
|
||||
pTmrInfo->launchChkptId = pActiveInfo->activeId;
|
||||
} else { // already launched, do nothing
|
||||
stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
|
||||
|
@ -367,6 +365,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
// before the next checkpoint.
|
||||
code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
|
||||
if (code) {
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -675,10 +674,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
}
|
||||
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
// persist to disk
|
||||
}
|
||||
code = streamMetaCommit(pMeta);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -893,48 +889,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
return code;
|
||||
}
|
||||
|
||||
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
static int32_t doChkptStatusCheck(SStreamTask* pTask) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
// check the status every 100ms
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
if (++pTmrInfo->activeCounter < 50) {
|
||||
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
|
||||
return;
|
||||
}
|
||||
|
||||
pTmrInfo->activeCounter = 0;
|
||||
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
if (pState.state != TASK_STATUS__CK) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
|
||||
|
||||
// checkpoint-trigger recv flag is set, quit
|
||||
if (pActiveInfo->allUpstreamTriggerRecv) {
|
||||
|
@ -942,48 +901,44 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
|
||||
ref);
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
streamMutexLock(&pActiveInfo->lock);
|
||||
|
||||
// send msg to retrieve checkpoint trigger msg
|
||||
SArray* pList = pTask->upstreamInfo.pList;
|
||||
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
|
||||
if (pNotSendList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
|
||||
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
|
||||
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
|
||||
return;
|
||||
// streamMutexUnlock(&pTask->lock);
|
||||
// streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
|
||||
", quit, ref:%d",
|
||||
id, vgId, pTmrInfo->launchChkptId, ref);
|
||||
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
// streamMutexUnlock(&pActiveInfo->lock);
|
||||
// streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// active checkpoint info is cleared for now
|
||||
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d",
|
||||
id, vgId, ref);
|
||||
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
// streamMutexUnlock(&pActiveInfo->lock);
|
||||
// streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) {
|
||||
const char* id = pTask->id.idStr;
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
|
||||
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
|
||||
if (pNotSendList == NULL) {
|
||||
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||
|
@ -1007,13 +962,87 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
void* px = taosArrayPush(pNotSendList, pInfo);
|
||||
if (px == NULL) {
|
||||
stError("s-task:%s failed to record not send info, code: out of memory", id);
|
||||
taosArrayDestroy(pNotSendList);
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*ppNotSendList = pNotSendList;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
|
||||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
// check the status every 100ms
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
if (++pTmrInfo->activeCounter < 50) {
|
||||
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
|
||||
return;
|
||||
}
|
||||
|
||||
pTmrInfo->activeCounter = 0;
|
||||
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
SStreamTaskState state = streamTaskGetStatus(pTask);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
if (state.state != TASK_STATUS__CK) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id,
|
||||
vgId, state.name, ref);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
streamMutexLock(&pActiveInfo->lock);
|
||||
|
||||
int32_t code = doChkptStatusCheck(pTask);
|
||||
if (code) {
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
// send msg to retrieve checkpoint trigger msg
|
||||
SArray* pList = pTask->upstreamInfo.pList;
|
||||
SArray* pNotSendList = NULL;
|
||||
|
||||
code = doFindNotSendUpstream(pTask, pList, &pNotSendList);
|
||||
if (code) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref);
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
|
||||
taosArrayDestroy(pNotSendList);
|
||||
return;
|
||||
}
|
||||
|
||||
// do send retrieve checkpoint trigger msg to upstream
|
||||
int32_t size = taosArrayGetSize(pNotSendList);
|
||||
int32_t code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
|
||||
code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
|
||||
if (code) {
|
||||
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
|
||||
}
|
||||
|
@ -1023,7 +1052,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
|||
// check every 100ms
|
||||
if (size > 0) {
|
||||
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
|
||||
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
|
||||
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
|
||||
} else {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);
|
||||
|
|
|
@ -634,12 +634,8 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
|
|||
|
||||
void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
if (pTask->msgInfo.pRetryTmr != NULL) {
|
||||
streamTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId,
|
||||
"dispatch-monitor-tmr");
|
||||
} else {
|
||||
pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer);
|
||||
}
|
||||
streamTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId,
|
||||
"dispatch-monitor");
|
||||
}
|
||||
|
||||
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId,
|
||||
|
@ -888,77 +884,42 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
const char* id = pTask->id.idStr;
|
||||
static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) {
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
|
||||
// check the status every 100ms
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
if (++pTmrInfo->activeCounter < 50) {
|
||||
streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"chkpt-ready-monitor");
|
||||
return;
|
||||
}
|
||||
|
||||
pTmrInfo->activeCounter = 0;
|
||||
stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id);
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
if (pState.state != TASK_STATUS__CK) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId,
|
||||
pState.name, ref);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
streamMutexLock(&pActiveInfo->lock);
|
||||
|
||||
SArray* pList = pActiveInfo->pReadyMsgList;
|
||||
int32_t num = taosArrayGetSize(pList);
|
||||
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
|
||||
", quit, ref:%d",
|
||||
id, vgId, pTmrInfo->launchChkptId, ref);
|
||||
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// active checkpoint info is cleared for now
|
||||
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref);
|
||||
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t));
|
||||
|
||||
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr, ref:%d", id,
|
||||
vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num, ref);
|
||||
return -1;
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, int32_t num, int32_t vgId, int32_t level,
|
||||
const char* id) {
|
||||
SArray* pTmp = taosArrayInit(4, sizeof(int32_t));
|
||||
if (pTmp == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
|
@ -971,63 +932,138 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
|||
continue;
|
||||
}
|
||||
|
||||
void* p = taosArrayPush(pNotRspList, &pInfo->upstreamTaskId);
|
||||
void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId);
|
||||
if (p == NULL) {
|
||||
stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId);
|
||||
return terrno;
|
||||
} else {
|
||||
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId,
|
||||
pTask->info.taskLevel, pInfo->upstreamTaskId);
|
||||
level, pInfo->upstreamTaskId);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t checkpointId = pActiveInfo->activeId;
|
||||
*ppNotRspList = pTmp;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t notRsp = taosArrayGetSize(pNotRspList);
|
||||
if (notRsp > 0) { // send checkpoint-ready msg again
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) {
|
||||
int32_t* pTaskId = taosArrayGet(pNotRspList, i);
|
||||
if (pTaskId == NULL) {
|
||||
static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t checkpointId, SArray* pReadyList) {
|
||||
int32_t code = 0;
|
||||
int32_t num = taosArrayGetSize(pReadyList);
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) {
|
||||
int32_t* pTaskId = taosArrayGet(pNotRspList, i);
|
||||
if (pTaskId == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < num; ++j) {
|
||||
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pReadyList, j);
|
||||
if (pReadyInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < num; ++j) {
|
||||
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j);
|
||||
if (pReadyInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again
|
||||
|
||||
if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again
|
||||
|
||||
SRpcMsg msg = {0};
|
||||
int32_t code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId,
|
||||
pReadyInfo->childId, checkpointId, &msg);
|
||||
SRpcMsg msg = {0};
|
||||
code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId,
|
||||
pReadyInfo->childId, checkpointId, &msg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel,
|
||||
pReadyInfo->upstreamTaskId);
|
||||
} else {
|
||||
stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id);
|
||||
}
|
||||
stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel,
|
||||
pReadyInfo->upstreamTaskId);
|
||||
} else {
|
||||
stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id);
|
||||
stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id);
|
||||
}
|
||||
} else {
|
||||
stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
|
||||
SStreamTask* pTask = param;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
const char* id = pTask->id.idStr;
|
||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
|
||||
SArray* pNotRspList = NULL;
|
||||
|
||||
// check the status every 100ms
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
if (++pTmrInfo->activeCounter < 50) {
|
||||
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"chkpt-ready-monitor");
|
||||
return;
|
||||
}
|
||||
|
||||
// reset tmr
|
||||
pTmrInfo->activeCounter = 0;
|
||||
stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id);
|
||||
|
||||
streamMutexLock(&pTask->lock);
|
||||
SStreamTaskState state = streamTaskGetStatus(pTask);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
// 1. check status in the first place
|
||||
if (state.state != TASK_STATUS__CK) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready, ref:%d", id, vgId,
|
||||
state.name, ref);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
streamMutexLock(&pActiveInfo->lock);
|
||||
|
||||
SArray* pList = pActiveInfo->pReadyMsgList;
|
||||
int32_t num = taosArrayGetSize(pList);
|
||||
int32_t code = doTaskChkptStatusCheck(pTask, num);
|
||||
if (code) {
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
|
||||
if (code) {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id,
|
||||
tstrerror(code), ref);
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
|
||||
taosArrayDestroy(pNotRspList);
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t checkpointId = pActiveInfo->activeId;
|
||||
int32_t notRsp = taosArrayGetSize(pNotRspList);
|
||||
doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList);
|
||||
|
||||
if (notRsp > 0) { // send checkpoint-ready msg again
|
||||
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"chkpt-ready-monitor");
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
} else {
|
||||
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
|
||||
stDebug(
|
||||
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg "
|
||||
"and quit from timer, ref:%d",
|
||||
"s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit "
|
||||
"from timer, ref:%d",
|
||||
id, vgId, ref);
|
||||
|
||||
streamClearChkptReadyMsg(pActiveInfo);
|
||||
streamMutexUnlock(&pActiveInfo->lock);
|
||||
// release should be the last execution, since pTask may be destroy after it immidiately.
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
}
|
||||
|
||||
|
@ -1085,12 +1121,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
|||
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
|
||||
streamMetaAcquireOneTask(pTask);
|
||||
|
||||
if (pTmrInfo->tmrHandle == NULL) {
|
||||
pTmrInfo->tmrHandle = taosTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer);
|
||||
} else {
|
||||
streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"chkpt-ready-monitor");
|
||||
}
|
||||
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"chkpt-ready-monitor");
|
||||
|
||||
// mark the timer monitor checkpointId
|
||||
pTmrInfo->launchChkptId = pActiveInfo->activeId;
|
||||
|
|
|
@ -279,7 +279,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
|||
}
|
||||
|
||||
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
|
||||
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
|
||||
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
|
||||
"meta-hb-tmr");
|
||||
|
||||
code = taosReleaseRef(streamMetaId, rid);
|
||||
|
@ -301,7 +301,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
|||
}
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
||||
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
|
||||
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
|
||||
"meta-hb-tmr");
|
||||
|
||||
code = taosReleaseRef(streamMetaId, rid);
|
||||
|
@ -317,7 +317,7 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
|
||||
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer, &pInfo->hbTmr, 0, "stream-hb");
|
||||
pInfo->tickCounter = 0;
|
||||
pInfo->msgSendTs = -1;
|
||||
pInfo->hbCount = 0;
|
||||
|
|
|
@ -458,9 +458,6 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
|
|||
code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
|
||||
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
|
||||
TSDB_CHECK_NULL(pMeta->qHandle, code, lino, _err, terrno);
|
||||
|
||||
code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
|
||||
TSDB_CHECK_CODE(code, lino, _err);
|
||||
|
||||
|
@ -629,9 +626,6 @@ void streamMetaCloseImpl(void* arg) {
|
|||
taosMemoryFree(pMeta->path);
|
||||
streamMutexDestroy(&pMeta->backendMutex);
|
||||
|
||||
taosCleanUpScheduler(pMeta->qHandle);
|
||||
taosMemoryFree(pMeta->qHandle);
|
||||
|
||||
bkdMgtDestroy(pMeta->bkdChkptMgt);
|
||||
|
||||
pMeta->role = NODE_ROLE_UNINIT;
|
||||
|
@ -929,32 +923,38 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
|||
streamMetaWLock(pMeta);
|
||||
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||
if (code) {
|
||||
streamSetFatalError(pMeta, code, __func__, __LINE__);
|
||||
}
|
||||
streamMetaWUnLock(pMeta);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamMetaCommit(SStreamMeta* pMeta) {
|
||||
int32_t code = 0;
|
||||
code = tdbCommit(pMeta->db, pMeta->txn);
|
||||
int32_t code = tdbCommit(pMeta->db, pMeta->txn);
|
||||
if (code != 0) {
|
||||
stError("vgId:%d failed to commit stream meta", pMeta->vgId);
|
||||
return code;
|
||||
streamSetFatalError(pMeta, code, __func__, __LINE__);
|
||||
stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
|
||||
pMeta->fatalInfo.line);
|
||||
}
|
||||
|
||||
code = tdbPostCommit(pMeta->db, pMeta->txn);
|
||||
if (code != 0) {
|
||||
stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId);
|
||||
streamSetFatalError(pMeta, code, __func__, __LINE__);
|
||||
stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
|
||||
pMeta->fatalInfo.line);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||
if (code != 0) {
|
||||
stError("vgId:%d failed to begin trans", pMeta->vgId);
|
||||
return code;
|
||||
streamSetFatalError(pMeta, code, __func__, __LINE__);
|
||||
stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
|
||||
} else {
|
||||
stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
|
||||
}
|
||||
|
||||
stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1261,40 +1261,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
|
|||
streamMetaHbToMnode(pRid, NULL);
|
||||
}
|
||||
|
||||
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockRdlock(&pMeta->lock);
|
||||
if (code) {
|
||||
stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaRUnLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
} else {
|
||||
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaWLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaWUnLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
|
||||
QRY_PARAM_CHECK(pList);
|
||||
|
||||
|
@ -1398,60 +1364,6 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
int64_t startTs = 0;
|
||||
bool hasFillhistoryTask = false;
|
||||
STaskId hId = {0};
|
||||
|
||||
stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
|
||||
|
||||
streamMetaRLock(pMeta);
|
||||
|
||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
|
||||
if (ppTask != NULL) {
|
||||
startTs = (*ppTask)->taskCheckInfo.startTs;
|
||||
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
|
||||
hId = (*ppTask)->hTaskInfo.id;
|
||||
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
||||
// add the failed task info, along with the related fill-history task info into tasks list.
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
|
||||
if (hasFillhistoryTask) {
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
||||
}
|
||||
} else {
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
||||
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||
streamId, taskId, pMeta->vgId);
|
||||
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
|
||||
int32_t startTs = pTask->execInfo.checkTs;
|
||||
int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
// automatically set the related fill-history task to be failed.
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
|
||||
if (code) {
|
||||
stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
|
||||
int64_t startTs) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
|
|
@ -20,13 +20,14 @@ static void streamTaskResumeHelper(void* param, void* tmrId);
|
|||
static void streamTaskSchedHelper(void* param, void* tmrId);
|
||||
|
||||
void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
||||
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
||||
int64_t delaySchema = pTask->info.delaySchedParam;
|
||||
if (delaySchema != 0 && pTask->info.fillHistory == 0) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
||||
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
|
||||
pTask->info.delaySchedParam);
|
||||
|
||||
pTask->schedInfo.pDelayTimer =
|
||||
taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
|
||||
streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTask, streamTimer, &pTask->schedInfo.pDelayTimer,
|
||||
pTask->pMeta->vgId, "sched-tmr");
|
||||
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
|
||||
}
|
||||
}
|
||||
|
@ -76,13 +77,8 @@ void streamTaskResumeInFuture(SStreamTask* pTask) {
|
|||
|
||||
// add one ref count for task
|
||||
streamMetaAcquireOneTask(pTask);
|
||||
|
||||
if (pTask->schedInfo.pIdleTimer == NULL) {
|
||||
pTask->schedInfo.pIdleTimer = taosTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer);
|
||||
} else {
|
||||
streamTmrReset(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer,
|
||||
&pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr");
|
||||
}
|
||||
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer,
|
||||
pTask->pMeta->vgId, "resume-task-tmr");
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -125,7 +121,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
|
||||
|
||||
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
|
||||
stDebug("s-task:%s jump out of schedTimer", id);
|
||||
stDebug("s-task:%s should stop, jump out of schedTimer", id);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -139,9 +135,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
if (code) {
|
||||
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
|
||||
nextTrigger);
|
||||
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
|
||||
terrno = code;
|
||||
return;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pTrigger->type = STREAM_INPUT__GET_RES;
|
||||
|
@ -149,10 +144,9 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
if (pTrigger->pBlock == NULL) {
|
||||
taosFreeQitem(pTrigger);
|
||||
|
||||
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
|
||||
stError("s-task:%s failed to build retrieve data trigger, code:out of memory, try again in %dms", id,
|
||||
nextTrigger);
|
||||
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
|
||||
return;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
|
||||
|
@ -160,8 +154,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
|
||||
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
|
||||
return;
|
||||
stError("s-task:%s failed to put retrieve block into trigger, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
goto _end;
|
||||
}
|
||||
|
||||
code = streamTrySchedExec(pTask);
|
||||
|
@ -171,5 +165,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
}
|
||||
}
|
||||
|
||||
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
|
||||
_end:
|
||||
streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
|
||||
"sched-run-tmr");
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
|||
}
|
||||
|
||||
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
|
||||
if (numOfTicks <= 0) {
|
||||
numOfTicks = 1;
|
||||
|
@ -100,14 +101,8 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
|
|||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref);
|
||||
|
||||
if (pTask->schedHistoryInfo.pTimer == NULL) {
|
||||
pTask->schedHistoryInfo.pTimer =
|
||||
taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
|
||||
} else {
|
||||
streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
|
||||
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
|
||||
}
|
||||
streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
|
||||
&pTask->schedHistoryInfo.pTimer, vgId, "history-task");
|
||||
}
|
||||
|
||||
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
|
||||
|
@ -337,7 +332,7 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
|
|||
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
|
||||
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
|
||||
|
||||
streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
|
||||
streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
|
||||
pTask->pMeta->vgId, " start-history-task-tmr");
|
||||
}
|
||||
}
|
||||
|
@ -391,7 +386,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
|
||||
pHTaskInfo->tickCount -= 1;
|
||||
if (pHTaskInfo->tickCount > 0) {
|
||||
streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
|
||||
streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
|
||||
pTask->pMeta->vgId, " start-history-task-tmr");
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return;
|
||||
|
@ -519,7 +514,7 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
stDebug("s-task:%s set timer active flag, task timer not null", idStr);
|
||||
streamTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer,
|
||||
streamTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer,
|
||||
pTask->pMeta->vgId, " start-history-task-tmr");
|
||||
}
|
||||
|
||||
|
@ -621,8 +616,8 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
|
|||
// release the task.
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
} else {
|
||||
streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
|
||||
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
|
||||
streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
|
||||
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -30,8 +30,8 @@ typedef struct STaskInitTs {
|
|||
} STaskInitTs;
|
||||
|
||||
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now);
|
||||
static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal);
|
||||
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ);
|
||||
static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal);
|
||||
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ);
|
||||
|
||||
// restore the checkpoint id by negotiating the latest consensus checkpoint id
|
||||
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||
|
@ -505,3 +505,57 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
|
|||
|
||||
stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts);
|
||||
}
|
||||
|
||||
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
int64_t startTs = 0;
|
||||
bool hasFillhistoryTask = false;
|
||||
STaskId hId = {0};
|
||||
|
||||
stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
|
||||
|
||||
streamMetaRLock(pMeta);
|
||||
|
||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
|
||||
if (ppTask != NULL) {
|
||||
startTs = (*ppTask)->taskCheckInfo.startTs;
|
||||
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
|
||||
hId = (*ppTask)->hTaskInfo.id;
|
||||
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
||||
// add the failed task info, along with the related fill-history task info into tasks list.
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
|
||||
if (hasFillhistoryTask) {
|
||||
code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
|
||||
}
|
||||
} else {
|
||||
streamMetaRUnLock(pMeta);
|
||||
|
||||
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||
streamId, taskId, pMeta->vgId);
|
||||
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
|
||||
int32_t startTs = pTask->execInfo.checkTs;
|
||||
int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
|
||||
// automatically set the related fill-history task to be failed.
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
|
||||
if (code) {
|
||||
stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -801,8 +801,8 @@ bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
|
|||
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
|
||||
ret = true;
|
||||
}
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
|
|
@ -491,8 +491,8 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
|
|||
|
||||
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
|
||||
SStreamTask* pTask = pSM->pTask;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t code = 0;
|
||||
|
||||
// do update the task status
|
||||
streamMutexLock(&pTask->lock);
|
||||
|
|
|
@ -40,11 +40,23 @@ int32_t streamTimerGetInstance(tmr_h* pTmr) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
|
||||
void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId,
|
||||
const char* pMsg) {
|
||||
bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId);
|
||||
if (ret) {
|
||||
if (*pTmrId == NULL) {
|
||||
*pTmrId = taosTmrStart(fp, mseconds, pParam, pHandle);
|
||||
if (*pTmrId == NULL) {
|
||||
stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
bool ret = taosTmrReset(fp, mseconds, pParam, pHandle, pTmrId);
|
||||
if (ret) {
|
||||
stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
stDebug("vgId:%d start %s tmr succ", vgId, pMsg);
|
||||
}
|
||||
|
||||
void streamTmrStop(tmr_h tmrId) {
|
||||
|
|
|
@ -35,3 +35,55 @@ void streamMutexDestroy(TdThreadMutex *pMutex) {
|
|||
stError("%p mutex destroy, code:%s", pMutex, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockRdlock(&pMeta->lock);
|
||||
if (code) {
|
||||
stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaRUnLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code));
|
||||
} else {
|
||||
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaWLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamMetaWUnLock(SStreamMeta* pMeta) {
|
||||
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
|
||||
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino) {
|
||||
int32_t oldCode = atomic_val_compare_exchange_32(&pMeta->fatalInfo.code, 0, code);
|
||||
if (oldCode == 0) {
|
||||
pMeta->fatalInfo.ts = taosGetTimestampMs();
|
||||
pMeta->fatalInfo.threadId = taosGetSelfPthreadId();
|
||||
tstrncpy(pMeta->fatalInfo.func, funcName, tListLen(pMeta->fatalInfo.func));
|
||||
pMeta->fatalInfo.line = lino;
|
||||
stInfo("vgId:%d set fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino);
|
||||
} else {
|
||||
stFatal("vgId:%d existed fatal error:%s, ts:%" PRId64 " failed to set new fatal error code:%s", pMeta->vgId,
|
||||
tstrerror(pMeta->fatalInfo.code), pMeta->fatalInfo.ts, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamGetFatalError(const SStreamMeta* pMeta) {
|
||||
return atomic_load_32((volatile int32_t*) &pMeta->fatalInfo.code);
|
||||
}
|
||||
|
|
|
@ -616,7 +616,7 @@ static void httpHandleReq(SHttpMsg* msg) {
|
|||
int32_t fd = taosCreateSocketWithTimeout(5000);
|
||||
if (fd < 0) {
|
||||
tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", cli->addr,
|
||||
cli->port, chanId, cli->seq, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
cli->port, chanId, cli->seq, tstrerror(terrno));
|
||||
destroyHttpClient(cli);
|
||||
(void)taosReleaseRef(httpRefMgt, chanId);
|
||||
return;
|
||||
|
|
|
@ -31,23 +31,35 @@ int32_t taosSetSignal(int32_t signum, FSignalHandler sigfp) {
|
|||
|
||||
// SIGHUP doesn't exist in windows, we handle it in the way of ctrlhandler
|
||||
if (signum == SIGHUP) {
|
||||
SetConsoleCtrlHandler((PHANDLER_ROUTINE)sigfp, TRUE);
|
||||
if(SetConsoleCtrlHandler((PHANDLER_ROUTINE)sigfp, TRUE) == 0) {
|
||||
terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
|
||||
return terrno;
|
||||
}
|
||||
} else {
|
||||
signal(signum, (FWinSignalHandler)sigfp);
|
||||
if(signal(signum, (FWinSignalHandler)sigfp) == SIG_ERR) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t taosIgnSignal(int32_t signum) {
|
||||
if (signum == SIGUSR1 || signum == SIGHUP) return 0;
|
||||
signal(signum, SIG_IGN);
|
||||
if(signal(signum, SIG_IGN) == SIG_ERR) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t taosDflSignal(int32_t signum) {
|
||||
if (signum == SIGUSR1 || signum == SIGHUP) return 0;
|
||||
signal(signum, SIG_DFL);
|
||||
if(signal(signum, SIG_DFL) == SIG_ERR) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -300,14 +300,18 @@ int32_t taosSetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void
|
|||
}
|
||||
#endif
|
||||
|
||||
return setsockopt(pSocket->fd, level, optname, optval, optlen);
|
||||
int ret = setsockopt(pSocket->fd, level, optname, optval, optlen);
|
||||
if (ret == SOCKET_ERROR) {
|
||||
int errorCode = WSAGetLastError();
|
||||
return terrno = TAOS_SYSTEM_WINSOCKET_ERROR(errorCode);
|
||||
}
|
||||
#else
|
||||
int32_t code = setsockopt(pSocket->fd, level, optname, optval, (int)optlen);
|
||||
if (-1 == code) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
return code;
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -325,19 +329,6 @@ int32_t taosGetSockOpt(TdSocketPtr pSocket, int32_t level, int32_t optname, void
|
|||
|
||||
#endif
|
||||
|
||||
uint32_t taosInetAddr(const char *ipAddr) {
|
||||
#ifdef WINDOWS
|
||||
uint32_t value;
|
||||
int32_t ret = inet_pton(AF_INET, ipAddr, &value);
|
||||
if (ret <= 0) {
|
||||
return INADDR_NONE;
|
||||
} else {
|
||||
return value;
|
||||
}
|
||||
#else
|
||||
return inet_addr(ipAddr);
|
||||
#endif
|
||||
}
|
||||
const char *taosInetNtoa(struct in_addr ipInt, char *dstStr, int32_t len) {
|
||||
const char *r = inet_ntop(AF_INET, &ipInt, dstStr, len);
|
||||
if (NULL == r) {
|
||||
|
@ -943,8 +934,7 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) {
|
|||
int iResult;
|
||||
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
|
||||
if (iResult != 0) {
|
||||
// printf("WSAStartup failed: %d\n", iResult);
|
||||
return 0xFFFFFFFF;
|
||||
return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError());
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -1008,7 +998,7 @@ int32_t taosGetIpv4FromFqdn(const char *fqdn, uint32_t *ip) {
|
|||
#endif
|
||||
|
||||
*ip = 0xFFFFFFFF;
|
||||
return 0xFFFFFFFF;
|
||||
return TSDB_CODE_RPC_FQDN_ERROR;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
@ -1021,7 +1011,7 @@ int32_t taosGetFqdn(char *fqdn) {
|
|||
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
|
||||
if (iResult != 0) {
|
||||
// printf("WSAStartup failed: %d\n", iResult);
|
||||
return 1;
|
||||
return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError());
|
||||
}
|
||||
#endif
|
||||
char hostname[1024];
|
||||
|
@ -1077,8 +1067,8 @@ int32_t taosGetFqdn(char *fqdn) {
|
|||
|
||||
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
|
||||
if (!result) {
|
||||
fprintf(stderr, "failed to get fqdn, code:%d, hostname:%s, reason:%s\n", ret, hostname, gai_strerror(ret));
|
||||
return -1;
|
||||
//fprintf(stderr, "failed to get fqdn, code:%d, hostname:%s, reason:%s\n", ret, hostname, gai_strerror(ret));
|
||||
return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError());
|
||||
}
|
||||
strcpy(fqdn, result->ai_canonname);
|
||||
freeaddrinfo(result);
|
||||
|
@ -1140,13 +1130,13 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
|
|||
|
||||
if ((fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
#if defined(WINDOWS)
|
||||
if (0 != setsockopt(fd, IPPROTO_TCP, TCP_MAXRT, (char *)&timeout, sizeof(timeout))) {
|
||||
taosCloseSocketNoCheck1(fd);
|
||||
return -1;
|
||||
return TAOS_SYSTEM_WINSOCKET_ERROR(WSAGetLastError());
|
||||
}
|
||||
#elif defined(_TD_DARWIN_64)
|
||||
// invalid config
|
||||
|
@ -1160,7 +1150,7 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
|
|||
if (-1 == setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, (char *)&conn_timeout_ms, sizeof(conn_timeout_ms))) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_SKIP_ERROR(taosCloseSocketNoCheck1(fd));
|
||||
return -1;
|
||||
return terrno;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
|
|
@ -216,14 +216,14 @@ int32_t taosConvInit(void) {
|
|||
|
||||
for (int32_t i = 0; i < gConvMaxNum[M2C]; ++i) {
|
||||
gConv[M2C][i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset);
|
||||
if ((iconv_t)-1 == gConv[M2C][i].conv || (iconv_t)0 == gConv[M2C][i].conv) {
|
||||
if ((iconv_t)-1 == gConv[M2C][i].conv) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
for (int32_t i = 0; i < gConvMaxNum[1 - M2C]; ++i) {
|
||||
gConv[1 - M2C][i].conv = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC);
|
||||
if ((iconv_t)-1 == gConv[1 - M2C][i].conv || (iconv_t)0 == gConv[1 - M2C][i].conv) {
|
||||
if ((iconv_t)-1 == gConv[1 - M2C][i].conv) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return terrno;
|
||||
}
|
||||
|
@ -251,13 +251,13 @@ iconv_t taosAcquireConv(int32_t *idx, ConvType type) {
|
|||
*idx = -1;
|
||||
if (type == M2C) {
|
||||
iconv_t c = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset);
|
||||
if ((iconv_t)-1 == c || (iconv_t)0 == c) {
|
||||
if ((iconv_t)-1 == c) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
return c;
|
||||
} else {
|
||||
iconv_t c = iconv_open(tsCharset, DEFAULT_UNICODE_ENCODEC);
|
||||
if ((iconv_t)-1 == c || (iconv_t)0 == c) {
|
||||
if ((iconv_t)-1 == c) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
return c;
|
||||
|
@ -289,7 +289,11 @@ iconv_t taosAcquireConv(int32_t *idx, ConvType type) {
|
|||
}
|
||||
|
||||
*idx = startId;
|
||||
return gConv[type][startId].conv;
|
||||
if ((iconv_t)0 == gConv[type][startId].conv) {
|
||||
return (iconv_t)-1;
|
||||
} else {
|
||||
return gConv[type][startId].conv;
|
||||
}
|
||||
}
|
||||
|
||||
void taosReleaseConv(int32_t idx, iconv_t conv, ConvType type) {
|
||||
|
@ -312,7 +316,7 @@ bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4
|
|||
|
||||
int32_t idx = -1;
|
||||
iconv_t conv = taosAcquireConv(&idx, M2C);
|
||||
if ((iconv_t)-1 == conv || (iconv_t)0 == conv) {
|
||||
if ((iconv_t)-1 == conv) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -350,9 +354,8 @@ int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) {
|
|||
int32_t idx = -1;
|
||||
int32_t code = 0;
|
||||
iconv_t conv = taosAcquireConv(&idx, C2M);
|
||||
if ((iconv_t)-1 == conv || (iconv_t)0 == conv) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);;
|
||||
return code;
|
||||
if ((iconv_t)-1 == conv) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
size_t ucs4_input_len = ucs4_max_len;
|
||||
|
|
|
@ -93,6 +93,10 @@ LONG WINAPI FlCrashDump(PEXCEPTION_POINTERS ep) {
|
|||
path[len - 1] = 'p';
|
||||
|
||||
HANDLE file = CreateFile(path, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL);
|
||||
if (file == INVALID_HANDLE_VALUE) {
|
||||
FreeLibrary(dll);
|
||||
return EXCEPTION_CONTINUE_SEARCH;
|
||||
}
|
||||
|
||||
MINIDUMP_EXCEPTION_INFORMATION mei;
|
||||
mei.ThreadId = GetCurrentThreadId();
|
||||
|
@ -333,7 +337,10 @@ bool getWinVersionReleaseName(char *releaseName, int32_t maxLen) {
|
|||
UINT uLen;
|
||||
VS_FIXEDFILEINFO *pFileInfo;
|
||||
|
||||
GetWindowsDirectory(szFileName, MAX_PATH);
|
||||
int ret = GetWindowsDirectory(szFileName, MAX_PATH);
|
||||
if (ret == 0) {
|
||||
return false;
|
||||
}
|
||||
wsprintf(szFileName, L"%s%s", szFileName, L"\\explorer.exe");
|
||||
dwLen = GetFileVersionInfoSize(szFileName, &dwHandle);
|
||||
if (dwLen == 0) {
|
||||
|
@ -373,12 +380,12 @@ int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t
|
|||
|
||||
if(sName) snprintf(sName, maxLen, "macOS");
|
||||
if (sysctl(osversion_name, 2, osversion, &osversion_len, NULL, 0) == -1) {
|
||||
return -1;
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
uint32_t major, minor;
|
||||
if (sscanf(osversion, "%u.%u", &major, &minor) != 2) {
|
||||
return -1;
|
||||
if (sscanf(osversion, "%u.%u", &major, &minor) == EOF) {
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
if (major >= 20) {
|
||||
major -= 9; // macOS 11 and newer
|
||||
|
@ -433,8 +440,11 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) {
|
|||
#ifdef WINDOWS
|
||||
char value[100];
|
||||
DWORD bufferSize = sizeof(value);
|
||||
RegGetValue(HKEY_LOCAL_MACHINE, "HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0", "ProcessorNameString",
|
||||
LSTATUS ret = RegGetValue(HKEY_LOCAL_MACHINE, "HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0", "ProcessorNameString",
|
||||
RRF_RT_ANY, NULL, (PVOID)&value, &bufferSize);
|
||||
if (ret != ERROR_SUCCESS) {
|
||||
return TAOS_SYSTEM_ERROR(ret);
|
||||
}
|
||||
tstrncpy(cpuModel, value, maxLen);
|
||||
SYSTEM_INFO si;
|
||||
memset(&si, 0, sizeof(SYSTEM_INFO));
|
||||
|
@ -696,7 +706,7 @@ int32_t taosGetTotalMemory(int64_t *totalKB) {
|
|||
MEMORYSTATUSEX memsStat;
|
||||
memsStat.dwLength = sizeof(memsStat);
|
||||
if (!GlobalMemoryStatusEx(&memsStat)) {
|
||||
return -1;
|
||||
return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
|
||||
}
|
||||
|
||||
*totalKB = memsStat.ullTotalPhys / 1024;
|
||||
|
@ -705,6 +715,9 @@ int32_t taosGetTotalMemory(int64_t *totalKB) {
|
|||
return 0;
|
||||
#else
|
||||
*totalKB = (int64_t)(sysconf(_SC_PHYS_PAGES) * tsPageSizeKB);
|
||||
if(*totalKB <= 0) {
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
@ -760,7 +773,7 @@ int32_t taosGetSysMemory(int64_t *usedKB) {
|
|||
MEMORYSTATUSEX memsStat;
|
||||
memsStat.dwLength = sizeof(memsStat);
|
||||
if (!GlobalMemoryStatusEx(&memsStat)) {
|
||||
return -1;
|
||||
return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
|
||||
}
|
||||
|
||||
int64_t nMemFree = memsStat.ullAvailPhys / 1024;
|
||||
|
@ -773,6 +786,9 @@ int32_t taosGetSysMemory(int64_t *usedKB) {
|
|||
return 0;
|
||||
#else
|
||||
*usedKB = sysconf(_SC_AVPHYS_PAGES) * tsPageSizeKB;
|
||||
if(*usedKB <= 0) {
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
@ -832,7 +848,7 @@ int32_t taosGetProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int
|
|||
if (write_bytes) *write_bytes = 0;
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
|
||||
#elif defined(_TD_DARWIN_64)
|
||||
if (rchars) *rchars = 0;
|
||||
if (wchars) *wchars = 0;
|
||||
|
@ -1022,7 +1038,10 @@ void taosKillSystem() {
|
|||
int32_t taosGetSystemUUID(char *uid, int32_t uidlen) {
|
||||
#ifdef WINDOWS
|
||||
GUID guid;
|
||||
CoCreateGuid(&guid);
|
||||
HRESULT h = CoCreateGuid(&guid);
|
||||
if (h != S_OK) {
|
||||
return TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
|
||||
}
|
||||
snprintf(uid, uidlen, "%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X", guid.Data1, guid.Data2, guid.Data3,
|
||||
guid.Data4[0], guid.Data4[1], guid.Data4[2], guid.Data4[3], guid.Data4[4], guid.Data4[5], guid.Data4[6],
|
||||
guid.Data4[7]);
|
||||
|
@ -1226,14 +1245,13 @@ SysNameInfo taosGetSysNameInfo() {
|
|||
}
|
||||
|
||||
char localHostName[512];
|
||||
taosGetlocalhostname(localHostName, 512);
|
||||
TAOS_SKIP_ERROR(taosGetlocalhostname(localHostName, 512));
|
||||
TdCmdPtr pCmd = taosOpenCmd("scutil --get LocalHostName");
|
||||
tstrncpy(info.nodename, localHostName, sizeof(info.nodename));
|
||||
|
||||
return info;
|
||||
#else
|
||||
SysNameInfo info = {0};
|
||||
|
||||
struct utsname uts;
|
||||
if (!uname(&uts)) {
|
||||
tstrncpy(info.sysname, uts.sysname, sizeof(info.sysname));
|
||||
|
@ -1269,7 +1287,7 @@ bool taosCheckCurrentInDll() {
|
|||
}
|
||||
|
||||
#ifdef _TD_DARWIN_64
|
||||
int taosGetMaclocalhostnameByCommand(char *hostname, size_t maxLen) {
|
||||
int32_t taosGetMaclocalhostnameByCommand(char *hostname, size_t maxLen) {
|
||||
TdCmdPtr pCmd = taosOpenCmd("scutil --get LocalHostName");
|
||||
if (pCmd != NULL) {
|
||||
if (taosGetsCmd(pCmd, maxLen - 1, hostname) > 0) {
|
||||
|
@ -1281,10 +1299,10 @@ int taosGetMaclocalhostnameByCommand(char *hostname, size_t maxLen) {
|
|||
}
|
||||
taosCloseCmd(&pCmd);
|
||||
}
|
||||
return -1;
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
int getMacLocalHostNameBySCD(char *hostname, size_t maxLen) {
|
||||
int32_t getMacLocalHostNameBySCD(char *hostname, size_t maxLen) {
|
||||
SCDynamicStoreRef store = SCDynamicStoreCreate(NULL, CFSTR(""), NULL, NULL);
|
||||
CFStringRef hostname_cfstr = SCDynamicStoreCopyLocalHostName(store);
|
||||
if (hostname_cfstr != NULL) {
|
||||
|
@ -1298,7 +1316,7 @@ int getMacLocalHostNameBySCD(char *hostname, size_t maxLen) {
|
|||
}
|
||||
#endif
|
||||
|
||||
int taosGetlocalhostname(char *hostname, size_t maxLen) {
|
||||
int32_t taosGetlocalhostname(char *hostname, size_t maxLen) {
|
||||
#ifdef _TD_DARWIN_64
|
||||
int res = getMacLocalHostNameBySCD(hostname, maxLen);
|
||||
if (res != 0) {
|
||||
|
|
|
@ -31,7 +31,7 @@ void WINAPI windowsServiceCtrlHandle(DWORD request) {
|
|||
ServiceStatus.dwCurrentState = SERVICE_STOP_PENDING;
|
||||
if (!SetServiceStatus(hServiceStatusHandle, &ServiceStatus)) {
|
||||
DWORD nError = GetLastError();
|
||||
printf("failed to send stopped status to windows service: %d", nError);
|
||||
fprintf(stderr, "failed to send stopped status to windows service: %d", nError);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
@ -50,19 +50,19 @@ void WINAPI mainWindowsService(int argc, char** argv) {
|
|||
hServiceStatusHandle = RegisterServiceCtrlHandler("taosd", &windowsServiceCtrlHandle);
|
||||
if (hServiceStatusHandle == 0) {
|
||||
DWORD nError = GetLastError();
|
||||
printf("failed to register windows service ctrl handler: %d", nError);
|
||||
fprintf(stderr, "failed to register windows service ctrl handler: %d", nError);
|
||||
}
|
||||
|
||||
ServiceStatus.dwCurrentState = SERVICE_RUNNING;
|
||||
if (SetServiceStatus(hServiceStatusHandle, &ServiceStatus)) {
|
||||
DWORD nError = GetLastError();
|
||||
printf("failed to send running status to windows service: %d", nError);
|
||||
fprintf(stderr, "failed to send running status to windows service: %d", nError);
|
||||
}
|
||||
if (mainWindowsFunc != NULL) mainWindowsFunc(argc, argv);
|
||||
ServiceStatus.dwCurrentState = SERVICE_STOPPED;
|
||||
if (!SetServiceStatus(hServiceStatusHandle, &ServiceStatus)) {
|
||||
DWORD nError = GetLastError();
|
||||
printf("failed to send stopped status to windows service: %d", nError);
|
||||
fprintf(stderr, "failed to send stopped status to windows service: %d", nError);
|
||||
}
|
||||
}
|
||||
void stratWindowsService(MainWindows mainWindows) {
|
||||
|
@ -140,17 +140,27 @@ void taosCloseDll(void* handle) {
|
|||
}
|
||||
#endif
|
||||
|
||||
int taosSetConsoleEcho(bool on) {
|
||||
int32_t taosSetConsoleEcho(bool on) {
|
||||
#if defined(WINDOWS)
|
||||
HANDLE hStdin = GetStdHandle(STD_INPUT_HANDLE);
|
||||
if (hStdin == INVALID_HANDLE_VALUE) {
|
||||
terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
|
||||
return terrno;
|
||||
}
|
||||
DWORD mode = 0;
|
||||
GetConsoleMode(hStdin, &mode);
|
||||
if(!GetConsoleMode(hStdin, &mode)){
|
||||
terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
|
||||
return terrno;
|
||||
}
|
||||
if (on) {
|
||||
mode |= ENABLE_ECHO_INPUT;
|
||||
} else {
|
||||
mode &= ~ENABLE_ECHO_INPUT;
|
||||
}
|
||||
SetConsoleMode(hStdin, mode);
|
||||
if(!SetConsoleMode(hStdin, mode)) {
|
||||
terrno = TAOS_SYSTEM_WINAPI_ERROR(GetLastError());
|
||||
return terrno;
|
||||
}
|
||||
|
||||
return 0;
|
||||
#else
|
||||
|
|
Loading…
Reference in New Issue