Merge branch '3.0' into fix/TD-19223-D
This commit is contained in:
commit
89c0f7611b
|
@ -2,7 +2,7 @@
|
|||
# taos-tools
|
||||
ExternalProject_Add(taos-tools
|
||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||
GIT_TAG 9284147
|
||||
GIT_TAG cc973e0
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -247,7 +247,7 @@ CREATE TOPIC topic_name as subquery
|
|||
|
||||
- 该类型 TOPIC 一旦创建则订阅数据的结构确定。
|
||||
- 被订阅或用于计算的列或标签不可被删除(`ALTER table DROP`)、修改(`ALTER table MODIFY`)。
|
||||
- 若发生表结构变更,新增的列不出现在结果中,若发生列删除则会报错。
|
||||
- 若发生表结构变更,新增的列不出现在结果中。
|
||||
|
||||
### 超级表订阅
|
||||
|
||||
|
@ -269,10 +269,10 @@ CREATE TOPIC topic_name AS STABLE stb_name
|
|||
语法:
|
||||
|
||||
```sql
|
||||
CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
|
||||
CREATE TOPIC topic_name AS DATABASE db_name;
|
||||
```
|
||||
|
||||
通过该语句可创建一个包含数据库所有表数据的订阅,`WITH META` 可选择将数据库结构变动信息加入到订阅消息流,TMQ 将消费当前数据库下所有表结构的变动,包括超级表的创建与删除,列添加、删除或修改,子表的创建、删除及 TAG 变动信息等等。消费者可通过 API 来判断具体的消息类型。这一点也是与 Kafka 不同的地方。
|
||||
通过该语句可创建一个包含数据库所有表数据的订阅
|
||||
|
||||
## 创建消费者 *consumer*
|
||||
|
||||
|
@ -282,16 +282,16 @@ CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;
|
|||
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
|
||||
| `td.connect.ip` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td.connect.user` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td.connect.pass` | string | 用于创建连接,同 `taos_connect` |
|
||||
| `td.connect.port` | integer | 用于创建连接,同 `taos_connect` |
|
||||
| `td.connect.pass` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `td.connect.port` | integer | 用于创建连接,同 `taos_connect` | |
|
||||
| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
|
||||
| `client.id` | string | 客户端 ID | 最大长度:192。 |
|
||||
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
|
||||
| `enable.auto.commit` | boolean | 启用自动提交 | 合法值:`true`, `false`。 |
|
||||
| `auto.commit.interval.ms` | integer | 以毫秒为单位的自动提交时间间隔 |
|
||||
| `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | |
|
||||
| `experimental.snapshot.enable` | boolean | 从 WAL 开始消费,还是从 TSBS 开始消费 | |
|
||||
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名 |
|
||||
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
|
||||
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交 | 合法值:`true`, `false`。 |
|
||||
| `auto.commit.interval.ms` | integer | 以毫秒为单位的消费记录自动提交消费位点时间间 | 默认 5000 m |
|
||||
| `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 默认开启 |
|
||||
| `experimental.snapshot.enable` | boolean | 是否允许从 TSDB 消费数据 | 实验功能,默认关闭 |
|
||||
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) | |
|
||||
|
||||
对于不同编程语言,其设置方式如下:
|
||||
|
||||
|
@ -430,13 +430,13 @@ Python 使用以下配置项创建一个 Consumer 实例。
|
|||
| `td_connect_port` | string | 用于创建连接,同 `taos_connect` | |
|
||||
| `group_id` | string | 消费组 ID,同一消费组共享消费进度 | **必填项**。最大长度:192。 |
|
||||
| `client_id` | string | 客户端 ID | 最大长度:192。 |
|
||||
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`, `latest`, `none`(default) |
|
||||
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`。 |
|
||||
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | |
|
||||
| `auto_offset_reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
|
||||
| `enable_auto_commit` | string | 启用自动提交 | 合法值:`true`, `false`,默认为 true |
|
||||
| `auto_commit_interval_ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms |
|
||||
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
|
||||
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
|
||||
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
|
||||
| `timeout` | int | 消费者拉去的超时时间 | |
|
||||
| `experimental_snapshot_enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` |
|
||||
| `msg_with_table_name` | string | 是否允许从消息中解析表名,不适用于列订阅 | 合法值:`true`, `false` |
|
||||
| `timeout` | int | 消费者拉取数据的超时时间 | |
|
||||
|
||||
</TabItem>
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ typedef enum {
|
|||
WRITE_QUEUE,
|
||||
APPLY_QUEUE,
|
||||
SYNC_QUEUE,
|
||||
SYNC_CTRL_QUEUE,
|
||||
STREAM_QUEUE,
|
||||
QUEUE_MAX,
|
||||
} EQueueType;
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
typedef struct STSRow {
|
||||
TSKEY ts;
|
||||
union {
|
||||
uint32_t info;
|
||||
|
|
|
@ -695,6 +695,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
|
|||
int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg);
|
||||
int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
||||
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
|
||||
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
|
||||
|
||||
// -----------------------------------------
|
||||
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
|
||||
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
|
|
|
@ -83,6 +83,12 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
|
|||
}
|
||||
}
|
||||
|
||||
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
|
||||
if (CODE) { \
|
||||
LINO = __LINE__; \
|
||||
goto LABEL; \
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -186,8 +186,8 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
|
|||
STscObj* pTscObj = (*pRequest)->pTscObj;
|
||||
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
|
||||
sizeof((*pRequest)->self))) {
|
||||
tscError("%" PRIx64 " failed to add to request container, reqId:0x%" PRIu64 ", conn:%" PRIx64 ", %s", (*pRequest)->self,
|
||||
(*pRequest)->requestId, pTscObj->id, sql);
|
||||
tscError("%" PRIx64 " failed to add to request container, reqId:0x%" PRIu64 ", conn:%" PRIx64 ", %s",
|
||||
(*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
|
||||
|
||||
taosMemoryFree(param);
|
||||
destroyRequest(*pRequest);
|
||||
|
@ -199,9 +199,8 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
|
|||
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
|
||||
if (TSDB_CODE_SUCCESS !=
|
||||
nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
|
||||
tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self,
|
||||
(*pRequest)->requestId, pTscObj->id, sql);
|
||||
|
||||
tscError("%" PRId64 " failed to create node allocator, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s",
|
||||
(*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
|
||||
destroyRequest(*pRequest);
|
||||
*pRequest = NULL;
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
@ -1040,39 +1039,39 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
|||
SSqlCallbackWrapper* pWrapper) {
|
||||
pRequest->type = pQuery->msgType;
|
||||
|
||||
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||
|
||||
SPlanContext cxt = {.queryId = pRequest->requestId,
|
||||
.acctId = pRequest->pTscObj->acctId,
|
||||
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
||||
.pAstRoot = pQuery->pRoot,
|
||||
.showRewrite = pQuery->showRewrite,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||
.pUser = pRequest->pTscObj->user,
|
||||
.sysInfo = pRequest->pTscObj->sysInfo,
|
||||
.allocatorId = pRequest->allocatorRefId};
|
||||
.acctId = pRequest->pTscObj->acctId,
|
||||
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
||||
.pAstRoot = pQuery->pRoot,
|
||||
.showRewrite = pQuery->showRewrite,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||
.pUser = pRequest->pTscObj->user,
|
||||
.sysInfo = pRequest->pTscObj->sysInfo,
|
||||
.allocatorId = pRequest->allocatorRefId};
|
||||
|
||||
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
||||
SQueryPlan* pDag = NULL;
|
||||
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
|
||||
SQueryPlan* pDag = NULL;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
|
||||
if (code) {
|
||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
} else {
|
||||
pRequest->body.subplanNum = pDag->numOfSubplans;
|
||||
}
|
||||
if (code) {
|
||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
} else {
|
||||
pRequest->body.subplanNum = pDag->numOfSubplans;
|
||||
}
|
||||
|
||||
pRequest->metric.planEnd = taosGetTimestampUs();
|
||||
pRequest->metric.planEnd = taosGetTimestampUs();
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
tscDebug("0x%" PRIx64 " create query plan success, elapsed time:%.2f ms, 0x%" PRIx64, pRequest->self,
|
||||
(pRequest->metric.planEnd - st)/1000.0, pRequest->requestId);
|
||||
(pRequest->metric.planEnd - st) / 1000.0, pRequest->requestId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
|
||||
SArray* pNodeList = NULL;
|
||||
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
|
||||
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
|
||||
SArray* pNodeList = NULL;
|
||||
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
|
||||
|
||||
SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
|
||||
.requestId = pRequest->requestId,
|
||||
|
@ -2262,7 +2261,7 @@ void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp,
|
|||
if (fp) {
|
||||
fp(param, NULL, terrno);
|
||||
}
|
||||
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ typedef struct SVnodeMgmt {
|
|||
SQWorkerPool streamPool;
|
||||
SWWorkerPool fetchPool;
|
||||
SWWorkerPool syncPool;
|
||||
SWWorkerPool syncCtrlPool;
|
||||
SWWorkerPool writePool;
|
||||
SWWorkerPool applyPool;
|
||||
SSingleWorker mgmtWorker;
|
||||
|
@ -60,6 +61,7 @@ typedef struct {
|
|||
SVnode *pImpl;
|
||||
STaosQueue *pWriteQ;
|
||||
STaosQueue *pSyncQ;
|
||||
STaosQueue *pSyncCtrlQ;
|
||||
STaosQueue *pApplyQ;
|
||||
STaosQueue *pQueryQ;
|
||||
STaosQueue *pStreamQ;
|
||||
|
@ -106,6 +108,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
|
|||
|
||||
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
|
|
@ -422,6 +422,8 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
|
|
@ -217,17 +217,80 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
|
|||
}
|
||||
}
|
||||
|
||||
static void *vmCloseVnodeInThread(void *param) {
|
||||
SVnodeThread *pThread = param;
|
||||
SVnodeMgmt *pMgmt = pThread->pMgmt;
|
||||
|
||||
dInfo("thread:%d, start to close %d vnodes", pThread->threadIndex, pThread->vnodeNum);
|
||||
setThreadName("close-vnodes");
|
||||
|
||||
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
||||
SVnodeObj *pVnode = pThread->ppVnodes[v];
|
||||
|
||||
char stepDesc[TSDB_STEP_DESC_LEN] = {0};
|
||||
snprintf(stepDesc, TSDB_STEP_DESC_LEN, "vgId:%d, start to close, %d of %d have been closed", pVnode->vgId,
|
||||
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
|
||||
tmsgReportStartup("vnode-close", stepDesc);
|
||||
|
||||
vmCloseVnode(pMgmt, pVnode);
|
||||
}
|
||||
|
||||
dInfo("thread:%d, numOfVnodes:%d is closed", pThread->threadIndex, pThread->vnodeNum);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
||||
dInfo("start to close all vnodes");
|
||||
|
||||
int32_t numOfVnodes = 0;
|
||||
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
|
||||
|
||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||
if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
|
||||
vmCloseVnode(pMgmt, ppVnodes[i]);
|
||||
int32_t threadNum = tsNumOfCores / 2;
|
||||
if (threadNum < 1) threadNum = 1;
|
||||
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
|
||||
|
||||
SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
|
||||
for (int32_t t = 0; t < threadNum; ++t) {
|
||||
threads[t].threadIndex = t;
|
||||
threads[t].pMgmt = pMgmt;
|
||||
threads[t].ppVnodes = taosMemoryCalloc(vnodesPerThread, sizeof(SVnode *));
|
||||
}
|
||||
|
||||
for (int32_t v = 0; v < numOfVnodes; ++v) {
|
||||
int32_t t = v % threadNum;
|
||||
SVnodeThread *pThread = &threads[t];
|
||||
if (pThread->ppVnodes != NULL && ppVnodes != NULL) {
|
||||
pThread->ppVnodes[pThread->vnodeNum++] = ppVnodes[v];
|
||||
}
|
||||
}
|
||||
|
||||
pMgmt->state.openVnodes = 0;
|
||||
dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
|
||||
|
||||
for (int32_t t = 0; t < threadNum; ++t) {
|
||||
SVnodeThread *pThread = &threads[t];
|
||||
if (pThread->vnodeNum == 0) continue;
|
||||
|
||||
TdThreadAttr thAttr;
|
||||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
|
||||
dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
|
||||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
}
|
||||
|
||||
for (int32_t t = 0; t < threadNum; ++t) {
|
||||
SVnodeThread *pThread = &threads[t];
|
||||
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
|
||||
taosThreadJoin(pThread->thread, NULL);
|
||||
taosThreadClear(&pThread->thread);
|
||||
}
|
||||
taosMemoryFree(pThread->ppVnodes);
|
||||
}
|
||||
taosMemoryFree(threads);
|
||||
|
||||
if (ppVnodes != NULL) {
|
||||
taosMemoryFree(ppVnodes);
|
||||
}
|
||||
|
|
|
@ -136,6 +136,22 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
|
|||
}
|
||||
}
|
||||
|
||||
static void vmProcessSyncCtrlQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
|
||||
|
||||
int32_t code = vnodeProcessSyncCtrlMsg(pVnode->pImpl, pMsg, NULL); // no response here
|
||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
|
@ -203,6 +219,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
dGTrace("vgId:%d, msg:%p put into vnode-sync queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pSyncQ, pMsg);
|
||||
break;
|
||||
case SYNC_CTRL_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-sync-ctrl queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pSyncCtrlQ, pMsg);
|
||||
break;
|
||||
case APPLY_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-apply queue", pVnode->vgId, pMsg);
|
||||
taosWriteQitem(pVnode->pApplyQ, pMsg);
|
||||
|
@ -219,6 +239,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
|
||||
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToSyncCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
return vmPutMsgToQueue(pMgmt, pMsg, SYNC_CTRL_QUEUE);
|
||||
}
|
||||
|
||||
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); }
|
||||
|
@ -301,6 +325,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg);
|
||||
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
||||
pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncCtrlQueue);
|
||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
|
||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
||||
|
@ -325,6 +350,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
|
||||
tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ);
|
||||
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||
tWWorkerFreeQueue(&pMgmt->syncCtrlPool, pVnode->pSyncCtrlQ);
|
||||
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||
tQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
||||
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
|
@ -370,6 +396,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
|||
pSPool->max = tsNumOfVnodeSyncThreads;
|
||||
if (tWWorkerInit(pSPool) != 0) return -1;
|
||||
|
||||
SWWorkerPool *pSCPool = &pMgmt->syncCtrlPool;
|
||||
pSCPool->name = "vnode-sync-ctrl";
|
||||
pSCPool->max = tsNumOfVnodeSyncThreads;
|
||||
if (tWWorkerInit(pSCPool) != 0) return -1;
|
||||
|
||||
SSingleWorkerCfg mgmtCfg = {
|
||||
.min = 1,
|
||||
.max = 1,
|
||||
|
@ -398,6 +429,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
|
|||
tWWorkerCleanup(&pMgmt->writePool);
|
||||
tWWorkerCleanup(&pMgmt->applyPool);
|
||||
tWWorkerCleanup(&pMgmt->syncPool);
|
||||
tWWorkerCleanup(&pMgmt->syncCtrlPool);
|
||||
tQWorkerCleanup(&pMgmt->queryPool);
|
||||
tQWorkerCleanup(&pMgmt->streamPool);
|
||||
tWWorkerCleanup(&pMgmt->fetchPool);
|
||||
|
|
|
@ -2555,7 +2555,7 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
|||
int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
|
||||
char *sep = ", ";
|
||||
int32_t sepLen = strlen(sep);
|
||||
int32_t rollupLen = sizeof(rollup) - 2;
|
||||
int32_t rollupLen = sizeof(rollup) - VARSTR_HEADER_SIZE - 2;
|
||||
for (int32_t i = 0; i < rollupNum; ++i) {
|
||||
char *funcName = taosArrayGet(pStb->pFuncs, i);
|
||||
if (i) {
|
||||
|
|
|
@ -82,6 +82,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
|||
|
||||
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
|
|
|
@ -32,12 +32,6 @@ extern "C" {
|
|||
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSD ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||
// clang-format on
|
||||
|
||||
#define TSDB_CHECK_CODE(CODE, LINO, LABEL) \
|
||||
if (CODE) { \
|
||||
LINO = __LINE__; \
|
||||
goto LABEL; \
|
||||
}
|
||||
|
||||
typedef struct TSDBROW TSDBROW;
|
||||
typedef struct TABLEID TABLEID;
|
||||
typedef struct TSDBKEY TSDBKEY;
|
||||
|
@ -247,18 +241,17 @@ void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, ch
|
|||
// SDelFile
|
||||
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
|
||||
// tsdbFS.c ==============================================================================================
|
||||
int32_t tsdbFSOpen(STsdb *pTsdb);
|
||||
int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback);
|
||||
int32_t tsdbFSClose(STsdb *pTsdb);
|
||||
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS);
|
||||
void tsdbFSDestroy(STsdbFS *pFS);
|
||||
int32_t tDFileSetCmprFn(const void *p1, const void *p2);
|
||||
int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFS);
|
||||
int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS);
|
||||
int32_t tsdbFSCommit(STsdb *pTsdb);
|
||||
int32_t tsdbFSRollback(STsdb *pTsdb);
|
||||
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFS);
|
||||
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS);
|
||||
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS);
|
||||
|
||||
int32_t tsdbFSRollback(STsdbFS *pFS);
|
||||
|
||||
int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet);
|
||||
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile);
|
||||
// tsdbReaderWriter.c ==============================================================================================
|
||||
|
|
|
@ -87,11 +87,13 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
|
|||
int32_t vnodeBegin(SVnode* pVnode);
|
||||
int32_t vnodeShouldCommit(SVnode* pVnode);
|
||||
int32_t vnodeCommit(SVnode* pVnode);
|
||||
void vnodeRollback(SVnode* pVnode);
|
||||
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
|
||||
int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
|
||||
int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
|
||||
int32_t vnodeSyncCommit(SVnode* pVnode);
|
||||
int32_t vnodeAsyncCommit(SVnode* pVnode);
|
||||
bool vnodeShouldRollback(SVnode* pVnode);
|
||||
|
||||
// vnodeSync.c
|
||||
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
||||
|
|
|
@ -97,10 +97,11 @@ typedef struct SMCtbCursor SMCtbCursor;
|
|||
typedef struct SMStbCursor SMStbCursor;
|
||||
typedef struct STbUidStore STbUidStore;
|
||||
|
||||
int metaOpen(SVnode* pVnode, SMeta** ppMeta);
|
||||
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
|
||||
int metaClose(SMeta* pMeta);
|
||||
int metaBegin(SMeta* pMeta, int8_t fromSys);
|
||||
int metaCommit(SMeta* pMeta);
|
||||
int metaFinishCommit(SMeta* pMeta);
|
||||
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
|
||||
|
@ -149,10 +150,12 @@ typedef struct {
|
|||
int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
|
||||
|
||||
// tsdb
|
||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg);
|
||||
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
|
||||
int tsdbClose(STsdb** pTsdb);
|
||||
int32_t tsdbBegin(STsdb* pTsdb);
|
||||
int32_t tsdbCommit(STsdb* pTsdb);
|
||||
int32_t tsdbFinishCommit(STsdb* pTsdb);
|
||||
int32_t tsdbRollbackCommit(STsdb* pTsdb);
|
||||
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
|
||||
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||
|
@ -200,7 +203,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
|
|||
// sma
|
||||
int32_t smaInit();
|
||||
void smaCleanUp();
|
||||
int32_t smaOpen(SVnode* pVnode);
|
||||
int32_t smaOpen(SVnode* pVnode, int8_t rollback);
|
||||
int32_t smaClose(SSma* pSma);
|
||||
int32_t smaBegin(SSma* pSma);
|
||||
int32_t smaSyncPreCommit(SSma* pSma);
|
||||
|
|
|
@ -34,6 +34,7 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) {
|
|||
|
||||
// commit the meta txn
|
||||
int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); }
|
||||
int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, &pMeta->txn); }
|
||||
|
||||
// abort the meta txn
|
||||
int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); }
|
||||
|
|
|
@ -27,7 +27,7 @@ static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int k
|
|||
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
|
||||
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
|
||||
|
||||
int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
||||
int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||
SMeta *pMeta = NULL;
|
||||
int ret;
|
||||
int slen;
|
||||
|
@ -60,49 +60,49 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
|||
taosMkDir(pMeta->path);
|
||||
|
||||
// open env
|
||||
ret = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv);
|
||||
ret = tdbOpen(pMeta->path, pVnode->config.szPage, pVnode->config.szCache, &pMeta->pEnv, rollback);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta env since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open pTbDb
|
||||
ret = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb);
|
||||
ret = tdbTbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open pSkmDb
|
||||
ret = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb);
|
||||
ret = tdbTbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open pUidIdx
|
||||
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
|
||||
ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open pNameIdx
|
||||
ret = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx);
|
||||
ret = tdbTbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open pCtbIdx
|
||||
ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx);
|
||||
ret = tdbTbOpen("ctb.idx", sizeof(SCtbIdxKey), -1, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open pSuidIdx
|
||||
ret = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx);
|
||||
ret = tdbTbOpen("suid.idx", sizeof(tb_uid_t), 0, uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pSuidIdx, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta super table index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
|
@ -119,27 +119,27 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
|
||||
ret = tdbTbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open pTtlIdx
|
||||
ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
|
||||
ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open pSmaIdx
|
||||
ret = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx);
|
||||
ret = tdbTbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb);
|
||||
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pStreamDb, 0);
|
||||
if (ret < 0) {
|
||||
metaError("vgId:%d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
|
|
|
@ -165,6 +165,8 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
|
|||
} else {
|
||||
code = metaCommit(pWriter->pMeta);
|
||||
if (code) goto _err;
|
||||
code = metaFinishCommit(pWriter->pMeta);
|
||||
if (code) goto _err;
|
||||
}
|
||||
taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
|
|
|
@ -29,19 +29,19 @@ static int32_t rsmaRestore(SSma *pSma);
|
|||
pKeepCfg->days = smaEvalDays(v, pCfg->retentions, l, pCfg->precision, pCfg->days); \
|
||||
} while (0)
|
||||
|
||||
#define SMA_OPEN_RSMA_IMPL(v, l) \
|
||||
do { \
|
||||
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \
|
||||
if (!RETENTION_VALID(r)) { \
|
||||
if (l == 0) { \
|
||||
goto _err; \
|
||||
} \
|
||||
break; \
|
||||
} \
|
||||
smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
|
||||
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg) < 0) { \
|
||||
goto _err; \
|
||||
} \
|
||||
#define SMA_OPEN_RSMA_IMPL(v, l) \
|
||||
do { \
|
||||
SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \
|
||||
if (!RETENTION_VALID(r)) { \
|
||||
if (l == 0) { \
|
||||
goto _err; \
|
||||
} \
|
||||
break; \
|
||||
} \
|
||||
smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \
|
||||
if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback) < 0) { \
|
||||
goto _err; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
/**
|
||||
|
@ -119,7 +119,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t smaOpen(SVnode *pVnode) {
|
||||
int32_t smaOpen(SVnode *pVnode, int8_t rollback) {
|
||||
STsdbCfg *pCfg = &pVnode->config.tsdbCfg;
|
||||
|
||||
ASSERT(!pVnode->pSma);
|
||||
|
|
|
@ -70,17 +70,17 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
|
|||
}
|
||||
|
||||
int32_t tqMetaOpen(STQ* pTq) {
|
||||
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) {
|
||||
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) {
|
||||
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) {
|
||||
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -1041,38 +1041,20 @@ _exit:
|
|||
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
STsdb *pTsdb = pCommitter->pTsdb;
|
||||
|
||||
STsdb *pTsdb = pCommitter->pTsdb;
|
||||
SMemTable *pMemTable = pTsdb->imem;
|
||||
|
||||
ASSERT(eno == 0 &&
|
||||
"tsdbCommit failure"
|
||||
"Restart taosd");
|
||||
|
||||
code = tsdbFSCommit1(pTsdb, &pCommitter->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// lock
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
// commit or rollback
|
||||
code = tsdbFSCommit2(pTsdb, &pCommitter->fs);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
if (eno) {
|
||||
code = eno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
pTsdb->imem = NULL;
|
||||
|
||||
// unlock
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
|
||||
tsdbUnrefMemTable(pMemTable);
|
||||
_exit:
|
||||
tsdbFSDestroy(&pCommitter->fs);
|
||||
taosArrayDestroy(pCommitter->aTbDataP);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
if (code || eno) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d tsdb end commit", TD_VID(pTsdb->pVnode));
|
||||
|
@ -1646,3 +1628,50 @@ _exit:
|
|||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFinishCommit(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SMemTable *pMemTable = pTsdb->imem;
|
||||
|
||||
// lock
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
code = tsdbFSCommit(pTsdb);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
pTsdb->imem = NULL;
|
||||
|
||||
// unlock
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
if (pMemTable) {
|
||||
tsdbUnrefMemTable(pMemTable);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d tsdb finish commit", TD_VID(pTsdb->pVnode));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbRollbackCommit(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbFSRollback(pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbInfo("vgId:%d tsdb rollback commit", TD_VID(pTsdb->pVnode));
|
||||
}
|
||||
return code;
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -33,7 +33,7 @@ int32_t tsdbSetKeepCfg(STsdb *pTsdb, STsdbCfg *pCfg) {
|
|||
* @param dir
|
||||
* @return int
|
||||
*/
|
||||
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg) {
|
||||
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKeepCfg, int8_t rollback) {
|
||||
STsdb *pTsdb = NULL;
|
||||
int slen = 0;
|
||||
|
||||
|
@ -66,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
|
|||
}
|
||||
|
||||
// open tsdb
|
||||
if (tsdbFSOpen(pTsdb) < 0) {
|
||||
if (tsdbFSOpen(pTsdb, rollback) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
|
|
@ -1334,8 +1334,8 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
|
|||
_exit:
|
||||
if (code) {
|
||||
if (pDelFWriter) {
|
||||
taosMemoryFree(pDelFWriter);
|
||||
tsdbCloseFile(&pDelFWriter->pWriteH);
|
||||
taosMemoryFree(pDelFWriter);
|
||||
}
|
||||
*ppWriter = NULL;
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
|
||||
|
|
|
@ -86,12 +86,12 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
|
|||
}
|
||||
|
||||
// do change fs
|
||||
code = tsdbFSCommit1(pTsdb, &fs);
|
||||
code = tsdbFSPrepareCommit(pTsdb, &fs);
|
||||
if (code) goto _err;
|
||||
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
code = tsdbFSCommit2(pTsdb, &fs);
|
||||
code = tsdbFSCommit(pTsdb);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _err;
|
||||
|
|
|
@ -517,8 +517,8 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s, TSDB path: %s", TD_VID(pTsdb->pVnode), lino, tstrerror(code),
|
||||
pTsdb->path);
|
||||
tsdbError("vgId:%d %s failed at line %d since %s, TSDB path: %s", TD_VID(pTsdb->pVnode), __func__, lino,
|
||||
tstrerror(code), pTsdb->path);
|
||||
*ppReader = NULL;
|
||||
|
||||
if (pReader) {
|
||||
|
@ -1380,13 +1380,13 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
|
|||
code = tsdbSnapWriteDelEnd(pWriter);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs);
|
||||
code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs);
|
||||
if (code) goto _err;
|
||||
|
||||
// lock
|
||||
taosThreadRwlockWrlock(&pTsdb->rwLock);
|
||||
|
||||
code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs);
|
||||
code = tsdbFSCommit(pWriter->pTsdb);
|
||||
if (code) {
|
||||
taosThreadRwlockUnlock(&pTsdb->rwLock);
|
||||
goto _err;
|
||||
|
|
|
@ -20,8 +20,6 @@
|
|||
|
||||
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
|
||||
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
|
||||
static int vnodeStartCommit(SVnode *pVnode);
|
||||
static int vnodeEndCommit(SVnode *pVnode);
|
||||
static int vnodeCommitImpl(void *arg);
|
||||
static void vnodeWaitCommit(SVnode *pVnode);
|
||||
|
||||
|
@ -215,6 +213,8 @@ int vnodeSyncCommit(SVnode *pVnode) {
|
|||
}
|
||||
|
||||
int vnodeCommit(SVnode *pVnode) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SVnodeInfo info = {0};
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
|
||||
|
@ -234,8 +234,8 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
|
||||
}
|
||||
if (vnodeSaveInfo(dir, &info) < 0) {
|
||||
vError("vgId:%d, failed to save vnode info since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
|
||||
|
||||
|
@ -249,8 +249,8 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
|
||||
// commit each sub-system
|
||||
if (metaCommit(pVnode->pMeta) < 0) {
|
||||
vError("vgId:%d, failed to commit meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (VND_IS_RSMA(pVnode)) {
|
||||
|
@ -259,22 +259,26 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (tsdbCommit(pVnode->pTsdb) < 0) {
|
||||
vError("vgId:%d, failed to commit tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
code = tsdbCommit(pVnode->pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (tqCommit(pVnode->pTq) < 0) {
|
||||
vError("vgId:%d, failed to commit tq since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = TSDB_CODE_FAILED;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
// walCommit (TODO)
|
||||
|
||||
// commit info
|
||||
if (vnodeCommitInfo(dir, &info) < 0) {
|
||||
vError("vgId:%d, failed to commit vnode info since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
return -1;
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
tsdbFinishCommit(pVnode->pTsdb);
|
||||
|
||||
if (metaFinishCommit(pVnode->pMeta) < 0) {
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
pVnode->state.committed = info.state.committed;
|
||||
|
@ -287,11 +291,31 @@ int vnodeCommit(SVnode *pVnode) {
|
|||
// apply the commit (TODO)
|
||||
walEndSnapshot(pVnode->pWal);
|
||||
|
||||
vInfo("vgId:%d, commit end", TD_VID(pVnode));
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
} else {
|
||||
vInfo("vgId:%d, commit end", TD_VID(pVnode));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool vnodeShouldRollback(SVnode *pVnode) {
|
||||
char tFName[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
|
||||
VND_INFO_FNAME_TMP);
|
||||
|
||||
return taosCheckExistFile(tFName);
|
||||
}
|
||||
|
||||
void vnodeRollback(SVnode *pVnode) {
|
||||
char tFName[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(tFName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
|
||||
VND_INFO_FNAME_TMP);
|
||||
|
||||
(void)taosRemoveFile(tFName);
|
||||
}
|
||||
|
||||
static int vnodeCommitImpl(void *arg) {
|
||||
SVnode *pVnode = (SVnode *)arg;
|
||||
|
||||
|
@ -304,16 +328,6 @@ static int vnodeCommitImpl(void *arg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int vnodeStartCommit(SVnode *pVnode) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int vnodeEndCommit(SVnode *pVnode) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
|
||||
|
||||
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
|
||||
|
|
|
@ -110,6 +110,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
taosThreadMutexInit(&pVnode->mutex, NULL);
|
||||
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
|
||||
|
||||
int8_t rollback = vnodeShouldRollback(pVnode);
|
||||
|
||||
// open buffer pool
|
||||
if (vnodeOpenBufPool(pVnode) < 0) {
|
||||
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
|
@ -117,19 +119,19 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
}
|
||||
|
||||
// open meta
|
||||
if (metaOpen(pVnode, &pVnode->pMeta) < 0) {
|
||||
if (metaOpen(pVnode, &pVnode->pMeta, rollback) < 0) {
|
||||
vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open tsdb
|
||||
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) {
|
||||
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL, rollback) < 0) {
|
||||
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open sma
|
||||
if (smaOpen(pVnode)) {
|
||||
if (smaOpen(pVnode, rollback)) {
|
||||
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
@ -153,14 +155,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
#if !VNODE_AS_LIB
|
||||
// open query
|
||||
if (vnodeQueryOpen(pVnode)) {
|
||||
vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
#endif
|
||||
|
||||
// vnode begin
|
||||
if (vnodeBegin(pVnode) < 0) {
|
||||
|
@ -169,13 +169,15 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
#if !VNODE_AS_LIB
|
||||
// open sync
|
||||
if (vnodeSyncOpen(pVnode, dir)) {
|
||||
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (rollback) {
|
||||
vnodeRollback(pVnode);
|
||||
}
|
||||
|
||||
return pVnode;
|
||||
|
||||
|
|
|
@ -53,7 +53,10 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
|
||||
tb_uid_t uid = tGenIdPI64();
|
||||
char *name = NULL;
|
||||
tStartDecode(&dc);
|
||||
if (tStartDecode(&dc) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tDecodeI32v(&dc, NULL) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
|
|
|
@ -323,6 +323,49 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
int32_t code = 0;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
if (!syncEnvIsStart()) {
|
||||
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId);
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||
if (pSyncNode == NULL) {
|
||||
vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId);
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
|
||||
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
|
||||
SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
|
||||
SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatReplyDestroy(pSyncMsg);
|
||||
|
||||
} else {
|
||||
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
|
||||
code = -1;
|
||||
}
|
||||
|
||||
vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
||||
code);
|
||||
syncNodeRelease(pSyncNode);
|
||||
if (code != 0 && terrno == 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
int32_t code = 0;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
|
|
@ -1095,7 +1095,7 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
|||
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
|
||||
SGroupResInfo* pGroupResInfo);
|
||||
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
|
||||
int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock,
|
||||
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
|
||||
SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
|
||||
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
|
||||
|
|
|
@ -150,9 +150,15 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) {
|
|||
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
|
||||
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
|
||||
SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM);
|
||||
if (NULL == pBuf || !allocBuf(pDeleter, pInput, pBuf)) {
|
||||
if (NULL == pBuf) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (!allocBuf(pDeleter, pInput, pBuf)) {
|
||||
taosFreeQitem(pBuf);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
toDataCacheEntry(pDeleter, pInput, pBuf);
|
||||
taosWriteQitem(pDeleter->pDataBlocks, pBuf);
|
||||
*pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false);
|
||||
|
@ -177,6 +183,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE
|
|||
|
||||
SDataDeleterBuf* pBuf = NULL;
|
||||
taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf);
|
||||
ASSERT(NULL != pBuf);
|
||||
memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf));
|
||||
taosFreeQitem(pBuf);
|
||||
|
||||
|
|
|
@ -324,6 +324,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
|||
tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
|
||||
if (code) {
|
||||
destroyDataSinker((SDataSinkHandle*)inserter);
|
||||
taosMemoryFree(inserter);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -569,6 +569,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
|||
}
|
||||
|
||||
pResult->info.groupId = pSrcBlock->info.groupId;
|
||||
memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
|
||||
|
||||
// if the source equals to the destination, it is to create a new column as the result of scalar
|
||||
// function or some operators.
|
||||
|
@ -4312,8 +4313,9 @@ int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, in
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pState, SSDataBlock* pBlock,
|
||||
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
|
||||
SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SExprInfo* pExprInfo = pSup->pExprInfo;
|
||||
int32_t numOfExprs = pSup->numOfExprs;
|
||||
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
|
||||
|
@ -4338,6 +4340,31 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta
|
|||
|
||||
if (pBlock->info.groupId == 0) {
|
||||
pBlock->info.groupId = pKey->groupId;
|
||||
|
||||
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||
if (tbname != NULL) {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
} else {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||
if (tbname != NULL) {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
} else {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
} else {
|
||||
// current value belongs to different group, it can't be packed into one datablock
|
||||
if (pBlock->info.groupId != pKey->groupId) {
|
||||
|
@ -4383,4 +4410,4 @@ int32_t buildSessionResultDataBlock(SExecTaskInfo* pTaskInfo, SStreamState* pSta
|
|||
}
|
||||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1492,6 +1492,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|||
case STREAM_NORMAL:
|
||||
case STREAM_INVALID: {
|
||||
doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
|
||||
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
|
||||
pInfo->srcRowIndex = 0;
|
||||
} break;
|
||||
default:
|
||||
|
@ -1502,6 +1503,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|||
|
||||
doStreamFillImpl(pOperator);
|
||||
doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo, NULL);
|
||||
memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
|
||||
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
break;
|
||||
|
|
|
@ -4044,7 +4044,7 @@ void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroup
|
|||
|
||||
// clear the existed group id
|
||||
pBlock->info.groupId = 0;
|
||||
buildSessionResultDataBlock(pTaskInfo, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
|
||||
buildSessionResultDataBlock(pOperator, pState, pBlock, &pOperator->exprSupp, pGroupResInfo);
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||
|
@ -4088,6 +4088,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||
}
|
||||
|
||||
if (pBlock->info.parTbName[0]) {
|
||||
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||
TSDB_TABLE_NAME_LEN);
|
||||
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||
}
|
||||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||
|
@ -4617,6 +4623,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
printDataBlock(pBlock, "single state recv");
|
||||
|
||||
if (pBlock->info.parTbName[0]) {
|
||||
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||
TSDB_TABLE_NAME_LEN);
|
||||
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||
}
|
||||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||
|
|
|
@ -84,13 +84,12 @@ static void endTlvEncode(STlvEncoder* pEncoder, char** pMsg, int32_t* pLen) {
|
|||
*pMsg = pEncoder->pBuf;
|
||||
pEncoder->pBuf = NULL;
|
||||
*pLen = pEncoder->offset;
|
||||
// nodesWarn("encode tlv count = %d, tl size = %d", pEncoder->tlvCount, sizeof(STlv) * pEncoder->tlvCount);
|
||||
}
|
||||
|
||||
static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pValue, int32_t len) {
|
||||
int32_t tlvLen = sizeof(STlv) + len;
|
||||
if (pEncoder->offset + tlvLen > pEncoder->allocSize) {
|
||||
pEncoder->allocSize = TMAX(pEncoder->allocSize * 2, pEncoder->allocSize + pEncoder->offset + tlvLen);
|
||||
pEncoder->allocSize = TMAX(pEncoder->allocSize * 2, pEncoder->allocSize + tlvLen);
|
||||
void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize);
|
||||
if (NULL == pNewBuf) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -241,6 +240,15 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pEncoder->offset + sizeof(STlv) > pEncoder->allocSize) {
|
||||
pEncoder->allocSize = TMAX(pEncoder->allocSize * 2, pEncoder->allocSize + sizeof(STlv));
|
||||
void* pNewBuf = taosMemoryRealloc(pEncoder->pBuf, pEncoder->allocSize);
|
||||
if (NULL == pNewBuf) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pEncoder->pBuf = pNewBuf;
|
||||
}
|
||||
|
||||
int32_t start = pEncoder->offset;
|
||||
pEncoder->offset += sizeof(STlv);
|
||||
int32_t code = func(pObj, pEncoder);
|
||||
|
@ -307,7 +315,7 @@ static int32_t tlvDecodeImpl(STlv* pTlv, void* pValue, int32_t len) {
|
|||
}
|
||||
|
||||
static int32_t tlvDecodeValueImpl(STlvDecoder* pDecoder, void* pValue, int32_t len) {
|
||||
if (pDecoder->offset + len > pDecoder->bufSize) {
|
||||
if (len > pDecoder->bufSize - pDecoder->offset) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
memcpy(pValue, pDecoder->pBuf + pDecoder->offset, len);
|
||||
|
@ -911,6 +919,10 @@ static int32_t msgToDatum(STlv* pTlv, void* pObj) {
|
|||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_VARBINARY: {
|
||||
if (pTlv->len > pNode->node.resType.bytes + VARSTR_HEADER_SIZE) {
|
||||
code = TSDB_CODE_FAILED;
|
||||
break;
|
||||
}
|
||||
pNode->datum.p = taosMemoryCalloc(1, pNode->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
|
||||
if (NULL == pNode->datum.p) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -190,14 +190,23 @@ int32_t nodesReleaseAllocator(int64_t allocatorId) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (NULL == g_pNodeAllocator) {
|
||||
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId);
|
||||
if (NULL == pAllocator) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = taosThreadMutexTryLock(&pAllocator->mutex);
|
||||
if (EBUSY != code) {
|
||||
nodesError("allocator id %" PRIx64
|
||||
" release failed: The nodesReleaseAllocator function needs to be called after the nodesAcquireAllocator "
|
||||
"function is called!",
|
||||
allocatorId);
|
||||
if (0 == code) {
|
||||
taosThreadMutexUnlock(&pAllocator->mutex);
|
||||
}
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
SNodeAllocator* pAllocator = g_pNodeAllocator;
|
||||
|
||||
g_pNodeAllocator = NULL;
|
||||
taosThreadMutexUnlock(&pAllocator->mutex);
|
||||
return taosReleaseRef(g_allocatorReqRefPool, allocatorId);
|
||||
|
@ -1826,7 +1835,7 @@ static EDealRes collectFuncs(SNode* pNode, void* pContext) {
|
|||
if (QUERY_NODE_FUNCTION == nodeType(pNode) && pCxt->classifier(((SFunctionNode*)pNode)->funcId) &&
|
||||
!(((SExprNode*)pNode)->orderAlias)) {
|
||||
SExprNode* pExpr = (SExprNode*)pNode;
|
||||
if (NULL == taosHashGet(pCxt->pFuncsSet, &pExpr, POINTER_BYTES)) {
|
||||
if (NULL == taosHashGet(pCxt->pFuncsSet, &pExpr, sizeof(SExprNode*))) {
|
||||
pCxt->errCode = nodesListStrictAppend(pCxt->pFuncs, nodesCloneNode(pNode));
|
||||
taosHashPut(pCxt->pFuncsSet, &pExpr, POINTER_BYTES, &pExpr, POINTER_BYTES);
|
||||
}
|
||||
|
|
|
@ -1365,8 +1365,12 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, TdFilePtr fp, STableDataB
|
|||
strtolower(pLine, pLine);
|
||||
char* pRawSql = pCxt->pSql;
|
||||
pCxt->pSql = pLine;
|
||||
bool gotRow = false;
|
||||
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf));
|
||||
bool gotRow = false;
|
||||
int32_t code = parseOneRow(pCxt, pDataBlock, tinfo.precision, &gotRow, tmpTokenBuf);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
pCxt->pSql = pRawSql;
|
||||
return code;
|
||||
}
|
||||
if (gotRow) {
|
||||
pDataBlock->size += extendedRowSize; // len;
|
||||
(*numOfRows)++;
|
||||
|
|
|
@ -222,6 +222,21 @@ int32_t buildCreateTbMsg(STableDataBlocks* pBlocks, SVCreateTbReq* pCreateTbReq)
|
|||
return code;
|
||||
}
|
||||
|
||||
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
|
||||
if (pDataBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pDataBlock->pData);
|
||||
// if (!pDataBlock->cloned) {
|
||||
// free the refcount for metermeta
|
||||
taosMemoryFreeClear(pDataBlock->pTableMeta);
|
||||
|
||||
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
|
||||
// }
|
||||
taosMemoryFreeClear(pDataBlock);
|
||||
}
|
||||
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32_t size, int32_t startOffset,
|
||||
int32_t rowSize, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList,
|
||||
SVCreateTbReq* pCreateTbReq) {
|
||||
|
@ -240,11 +255,13 @@ int32_t getDataBlockFromList(SHashObj* pHashList, void* id, int32_t idLen, int32
|
|||
if (NULL != pCreateTbReq && NULL != pCreateTbReq->ctb.pTag) {
|
||||
ret = buildCreateTbMsg(*dataBlocks, pCreateTbReq);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
destroyDataBlock(*dataBlocks);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
taosHashPut(pHashList, id, idLen, dataBlocks, POINTER_BYTES);
|
||||
// converting to 'const char*' is to handle coverity scan errors
|
||||
taosHashPut(pHashList, (const char*)id, idLen, (const char*)dataBlocks, POINTER_BYTES);
|
||||
if (pBlockList) {
|
||||
taosArrayPush(pBlockList, dataBlocks);
|
||||
}
|
||||
|
@ -266,21 +283,6 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
|||
return result;
|
||||
}
|
||||
|
||||
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
|
||||
if (pDataBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pDataBlock->pData);
|
||||
// if (!pDataBlock->cloned) {
|
||||
// free the refcount for metermeta
|
||||
taosMemoryFreeClear(pDataBlock->pTableMeta);
|
||||
|
||||
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
|
||||
// }
|
||||
taosMemoryFreeClear(pDataBlock);
|
||||
}
|
||||
|
||||
void destroyBlockArrayList(SArray* pDataBlockList) {
|
||||
if (pDataBlockList == NULL) {
|
||||
return;
|
||||
|
|
|
@ -248,8 +248,12 @@ int32_t getNumOfTags(const STableMeta* pTableMeta) { return getTableInfo(pTableM
|
|||
STableComInfo getTableInfo(const STableMeta* pTableMeta) { return pTableMeta->tableInfo; }
|
||||
|
||||
STableMeta* tableMetaDup(const STableMeta* pTableMeta) {
|
||||
size_t size = TABLE_META_SIZE(pTableMeta);
|
||||
int32_t numOfFields = TABLE_TOTAL_COL_NUM(pTableMeta);
|
||||
if (numOfFields > TSDB_MAX_COLUMNS || numOfFields < TSDB_MIN_COLUMNS) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
size_t size = sizeof(STableMeta) + numOfFields * sizeof(SSchema);
|
||||
STableMeta* p = taosMemoryMalloc(size);
|
||||
memcpy(p, pTableMeta, size);
|
||||
return p;
|
||||
|
|
|
@ -32,23 +32,29 @@
|
|||
namespace {
|
||||
|
||||
void generateInformationSchema(MockCatalogService* mcs) {
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_DNODES, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_DNODES, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("id", TSDB_DATA_TYPE_INT)
|
||||
.addColumn("endpoint", TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_MNODES, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_MNODES, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("id", TSDB_DATA_TYPE_INT)
|
||||
.addColumn("endpoint", TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_MODULES, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_MODULES, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("id", TSDB_DATA_TYPE_INT)
|
||||
.addColumn("endpoint", TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_QNODES, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_QNODES, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("id", TSDB_DATA_TYPE_INT)
|
||||
.addColumn("endpoint", TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_DATABASES, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_DATABASES, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
|
||||
.addColumn("create_time", TSDB_DATA_TYPE_TIMESTAMP)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_FUNCTIONS, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_FUNCTIONS, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("name", TSDB_DATA_TYPE_BINARY, TSDB_FUNC_NAME_LEN)
|
||||
.addColumn("aggregate", TSDB_DATA_TYPE_INT)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_INDEXES, TSDB_SYSTEM_TABLE, 3)
|
||||
.addColumn("index_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
|
||||
|
@ -63,22 +69,28 @@ void generateInformationSchema(MockCatalogService* mcs) {
|
|||
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
|
||||
.addColumn("table_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TABLE_DISTRIBUTED, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_TABLE_DISTRIBUTED, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
|
||||
.addColumn("table_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USERS, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_USERS, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("name", TSDB_DATA_TYPE_BINARY, TSDB_USER_LEN)
|
||||
.addColumn("super", TSDB_DATA_TYPE_TINYINT)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_VGROUPS, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_VGROUPS, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("vgroup_id", TSDB_DATA_TYPE_INT)
|
||||
.addColumn("db_name", TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_CONFIGS, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_CONFIGS, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("name", TSDB_DATA_TYPE_BINARY, TSDB_CONFIG_OPTION_LEN)
|
||||
.addColumn("value", TSDB_DATA_TYPE_BINARY, TSDB_CONFIG_VALUE_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_DNODE_VARIABLES, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("dnode_id", TSDB_DATA_TYPE_INT)
|
||||
.addColumn("name", TSDB_DATA_TYPE_BINARY, TSDB_CONFIG_OPTION_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_DNODE_VARIABLES, TSDB_SYSTEM_TABLE, 1)
|
||||
.addColumn("dnode_id", TSDB_DATA_TYPE_INT)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_CLUSTER, TSDB_SYSTEM_TABLE, 1)
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_CLUSTER, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("id", TSDB_DATA_TYPE_BIGINT)
|
||||
.addColumn("name", TSDB_DATA_TYPE_BINARY, TSDB_CLUSTER_ID_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_VNODES, TSDB_SYSTEM_TABLE, 2)
|
||||
|
@ -92,30 +104,22 @@ void generateInformationSchema(MockCatalogService* mcs) {
|
|||
}
|
||||
|
||||
void generatePerformanceSchema(MockCatalogService* mcs) {
|
||||
{
|
||||
ITableBuilder& builder =
|
||||
mcs->createTableBuilder(TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_TRANS, TSDB_SYSTEM_TABLE, 1)
|
||||
.addColumn("id", TSDB_DATA_TYPE_INT);
|
||||
builder.done();
|
||||
}
|
||||
{
|
||||
ITableBuilder& builder =
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_STREAMS, TSDB_SYSTEM_TABLE, 1)
|
||||
.addColumn("stream_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN);
|
||||
builder.done();
|
||||
}
|
||||
{
|
||||
ITableBuilder& builder =
|
||||
mcs->createTableBuilder(TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_CONSUMERS, TSDB_SYSTEM_TABLE, 1)
|
||||
.addColumn("stream_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN);
|
||||
builder.done();
|
||||
}
|
||||
{
|
||||
ITableBuilder& builder =
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_SUBSCRIPTIONS, TSDB_SYSTEM_TABLE, 1)
|
||||
.addColumn("stream_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN);
|
||||
builder.done();
|
||||
}
|
||||
mcs->createTableBuilder(TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_TRANS, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("id", TSDB_DATA_TYPE_INT)
|
||||
.addColumn("create_time", TSDB_DATA_TYPE_TIMESTAMP)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_STREAMS, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("stream_name", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
|
||||
.addColumn("create_time", TSDB_DATA_TYPE_TIMESTAMP)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_CONSUMERS, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("consumer_id", TSDB_DATA_TYPE_BIGINT)
|
||||
.addColumn("consumer_group", TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN)
|
||||
.done();
|
||||
mcs->createTableBuilder(TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_SUBSCRIPTIONS, TSDB_SYSTEM_TABLE, 2)
|
||||
.addColumn("vgroup_id", TSDB_DATA_TYPE_INT)
|
||||
.addColumn("consumer_id", TSDB_DATA_TYPE_BIGINT)
|
||||
.done();
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -423,13 +423,14 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if ((pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) > TSDB_MAX_COL_TAG_NUM) {
|
||||
int32_t numOfField = pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags;
|
||||
if (numOfField > TSDB_MAX_COL_TAG_NUM || numOfField < TSDB_MIN_COLUMNS) {
|
||||
*pDst = NULL;
|
||||
qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema);
|
||||
int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema);
|
||||
*pDst = taosMemoryMalloc(metaSize);
|
||||
if (NULL == *pDst) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
|
|
|
@ -137,6 +137,8 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
|
|||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
streamDispatch(pTask);
|
||||
}
|
||||
} else {
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
char* streamPath = taosMemoryCalloc(1, len);
|
||||
sprintf(streamPath, "%s/%s", path, "stream");
|
||||
pMeta->path = strdup(streamPath);
|
||||
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
|
||||
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
|
||||
taosMemoryFree(streamPath);
|
||||
goto _err;
|
||||
}
|
||||
|
@ -36,11 +36,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
taosMulModeMkDir(streamPath, 0755);
|
||||
taosMemoryFree(streamPath);
|
||||
|
||||
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
|
||||
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) {
|
||||
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
@ -262,12 +262,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
|
|||
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,26 +99,26 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
|
|||
memset(statePath, 0, 300);
|
||||
tstrncpy(statePath, path, 300);
|
||||
}
|
||||
if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) {
|
||||
if (tdbOpen(statePath, szPage, pages, &pState->db, 0) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open state storage backend
|
||||
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb) < 0) {
|
||||
if (tdbTbOpen("state.db", sizeof(SStateKey), -1, stateKeyCmpr, pState->db, &pState->pStateDb, 0) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// todo refactor
|
||||
if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb) < 0) {
|
||||
if (tdbTbOpen("fill.state.db", sizeof(SWinKey), -1, winKeyCmpr, pState->db, &pState->pFillStateDb, 0) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("session.state.db", sizeof(SStateSessionKey), -1, stateSessionKeyCmpr, pState->db,
|
||||
&pState->pSessionStateDb) < 0) {
|
||||
&pState->pSessionStateDb, 0) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb) < 0) {
|
||||
if (tdbTbOpen("func.state.db", sizeof(STupleKey), -1, STupleKeyCmpr, pState->db, &pState->pFuncStateDb, 0) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
|
|
|
@ -119,7 +119,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
for (int32_t i = 0; i < epSz; i++) {
|
||||
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
|
||||
if (pInfo == NULL) return -1;
|
||||
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) return -1;
|
||||
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
|
||||
taosMemoryFreeClear(pInfo);
|
||||
return -1;
|
||||
}
|
||||
taosArrayPush(pTask->childEpInfo, &pInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -287,6 +287,12 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries
|
|||
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
|
||||
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
|
||||
|
||||
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
|
||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s);
|
||||
|
||||
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
||||
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
||||
|
||||
// for debug --------------
|
||||
void syncNodePrint(SSyncNode* pObj);
|
||||
void syncNodePrint2(char* s, SSyncNode* pObj);
|
||||
|
|
|
@ -2282,7 +2282,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
|
||||
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||
//ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
|
||||
// ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
|
||||
syncNodeBecomeLeader(pSyncNode, "candidate to leader");
|
||||
|
||||
syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);
|
||||
|
@ -2676,7 +2676,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
|||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
|
||||
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
syncNodeReplicate(ths, false);
|
||||
|
@ -2726,6 +2726,55 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
|
||||
syncLogRecvHeartbeat(ths, pMsg, "");
|
||||
|
||||
SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
|
||||
pMsgReply->destId = pMsg->srcId;
|
||||
pMsgReply->srcId = ths->myRaftId;
|
||||
pMsgReply->term = ths->pRaftStore->currentTerm;
|
||||
pMsgReply->privateTerm = 8864; // magic number
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);
|
||||
|
||||
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||
syncNodeBecomeFollower(ths, "become follower by hb");
|
||||
}
|
||||
|
||||
if (pMsg->term == ths->pRaftStore->currentTerm) {
|
||||
// sInfo("vgId:%d, heartbeat reset timer", ths->vgId);
|
||||
syncNodeResetElectTimer(ths);
|
||||
|
||||
#if 0
|
||||
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||
syncNodeFollowerCommit(ths, pMsg->commitIndex);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
// htonl
|
||||
SMsgHead* pHead = rpcMsg.pCont;
|
||||
pHead->contLen = htonl(pHead->contLen);
|
||||
pHead->vgId = htonl(pHead->vgId);
|
||||
*/
|
||||
|
||||
// reply
|
||||
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
|
||||
syncLogRecvHeartbeatReply(ths, pMsg, "");
|
||||
|
||||
// update last reply time, make decision whether the other node is alive or not
|
||||
syncIndexMgrSetRecvTime(ths->pMatchIndex, &(pMsg->destId), pMsg->startTime);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// TLA+ Spec
|
||||
// ClientRequest(i, v) ==
|
||||
// /\ state[i] = Leader
|
||||
|
@ -2754,7 +2803,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
|
|||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||
if (code != 0) {
|
||||
// del resp mgr, call FpCommitCb
|
||||
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
|
||||
sError("vgId:%d, failed to append log entry since %s", ths->vgId, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -3073,9 +3122,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
|||
} else {
|
||||
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d, failed to get log entry since %s. index:%lld", ths->vgId, tstrerror(terrno), i);
|
||||
return -1;
|
||||
}
|
||||
sError("vgId:%d, failed to get log entry since %s. index:%lld", ths->vgId, tstrerror(terrno), i);
|
||||
return -1;
|
||||
}
|
||||
ASSERT(pEntry != NULL);
|
||||
}
|
||||
|
||||
|
@ -3337,3 +3386,45 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
|
|||
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
}
|
||||
|
||||
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"send sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port,
|
||||
pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s);
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
}
|
||||
|
||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port,
|
||||
pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s);
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
}
|
||||
|
||||
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "send sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
|
||||
host, port, pMsg->term, pMsg->privateTerm, s);
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
}
|
||||
|
||||
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
||||
char host[64];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
|
||||
host, port, pMsg->term, pMsg->privateTerm, s);
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
}
|
|
@ -31,15 +31,17 @@ typedef struct STBC TBC;
|
|||
typedef struct STxn TXN;
|
||||
|
||||
// TDB
|
||||
int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb);
|
||||
int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb, int8_t rollback);
|
||||
int32_t tdbClose(TDB *pDb);
|
||||
int32_t tdbBegin(TDB *pDb, TXN *pTxn);
|
||||
int32_t tdbCommit(TDB *pDb, TXN *pTxn);
|
||||
int32_t tdbPostCommit(TDB *pDb, TXN *pTxn);
|
||||
int32_t tdbAbort(TDB *pDb, TXN *pTxn);
|
||||
int32_t tdbAlter(TDB *pDb, int pages);
|
||||
|
||||
// TTB
|
||||
int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb);
|
||||
int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb,
|
||||
int8_t rollback);
|
||||
int32_t tdbTbClose(TTB *pTb);
|
||||
int32_t tdbTbDrop(TTB *pTb);
|
||||
int32_t tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
#include "tdbInt.h"
|
||||
|
||||
int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) {
|
||||
int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, int8_t rollback) {
|
||||
TDB *pDb;
|
||||
int dsize;
|
||||
int zsize;
|
||||
|
@ -66,7 +66,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) {
|
|||
|
||||
#ifdef USE_MAINDB
|
||||
// open main db
|
||||
ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SBtInfo), NULL, pDb, &pDb->pMainDb);
|
||||
ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SBtInfo), NULL, pDb, &pDb->pMainDb, rollback);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -129,6 +129,21 @@ int32_t tdbCommit(TDB *pDb, TXN *pTxn) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tdbPostCommit(TDB *pDb, TXN *pTxn) {
|
||||
SPager *pPager;
|
||||
int ret;
|
||||
|
||||
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
|
||||
ret = tdbPagerPostCommit(pPager, pTxn);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to commit pager since %s. dbName:%s, txnId:%d", tstrerror(terrno), pDb->dbName, pTxn->txnId);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tdbAbort(TDB *pDb, TXN *pTxn) {
|
||||
SPager *pPager;
|
||||
int ret;
|
||||
|
|
|
@ -305,6 +305,18 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
|
||||
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
|
||||
tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pPager->inTran = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// recovery dirty pages
|
||||
int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
||||
SPage *pPage;
|
||||
|
@ -657,3 +669,13 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tdbPagerRollback(SPager *pPager) {
|
||||
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
|
||||
tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,8 @@ struct STBC {
|
|||
SBTC btc;
|
||||
};
|
||||
|
||||
int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb) {
|
||||
int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb,
|
||||
int8_t rollback) {
|
||||
TTB *pTb;
|
||||
SPager *pPager;
|
||||
int ret;
|
||||
|
@ -110,10 +111,14 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
|
|||
return -1;
|
||||
}
|
||||
|
||||
ret = tdbPagerRestore(pPager, pTb->pBt);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pTb);
|
||||
return -1;
|
||||
if (rollback) {
|
||||
tdbPagerRollback(pPager);
|
||||
} else {
|
||||
ret = tdbPagerRestore(pPager, pTb->pBt);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pTb);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
*ppTb = pTb;
|
||||
|
|
|
@ -190,12 +190,14 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt);
|
|||
int tdbPagerWrite(SPager *pPager, SPage *pPage);
|
||||
int tdbPagerBegin(SPager *pPager, TXN *pTxn);
|
||||
int tdbPagerCommit(SPager *pPager, TXN *pTxn);
|
||||
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn);
|
||||
int tdbPagerAbort(SPager *pPager, TXN *pTxn);
|
||||
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
|
||||
TXN *pTxn);
|
||||
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
|
||||
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
|
||||
int tdbPagerRestore(SPager *pPager, SBTree *pBt);
|
||||
int tdbPagerRollback(SPager *pPager);
|
||||
|
||||
// tdbPCache.c ====================================
|
||||
#define TDB_PCACHE_PAGE \
|
||||
|
|
|
@ -140,7 +140,7 @@ static void generateBigVal(char *val, int valLen) {
|
|||
static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) {
|
||||
TDB *pEnv = NULL;
|
||||
|
||||
int ret = tdbOpen(envName, pageSize, pageNum, &pEnv);
|
||||
int ret = tdbOpen(envName, pageSize, pageNum, &pEnv, 0);
|
||||
if (ret) {
|
||||
pEnv = NULL;
|
||||
}
|
||||
|
@ -162,8 +162,8 @@ static void insertOfp(void) {
|
|||
// open db
|
||||
TTB *pDb = NULL;
|
||||
tdb_cmpr_fn_t compFunc = tKeyCmpr;
|
||||
// ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
|
||||
ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb);
|
||||
// ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||
ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
// open the pool
|
||||
|
@ -211,8 +211,8 @@ TEST(TdbOVFLPagesTest, TbGetTest) {
|
|||
// open db
|
||||
TTB *pDb = NULL;
|
||||
tdb_cmpr_fn_t compFunc = tKeyCmpr;
|
||||
// int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
|
||||
int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb);
|
||||
// int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||
int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
// generate value payload
|
||||
|
@ -253,7 +253,7 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) {
|
|||
// open db
|
||||
TTB *pDb = NULL;
|
||||
tdb_cmpr_fn_t compFunc = tKeyCmpr;
|
||||
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
|
||||
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
// open the pool
|
||||
|
@ -354,12 +354,12 @@ TEST(tdb_test, simple_insert1) {
|
|||
taosRemoveDir("tdb");
|
||||
|
||||
// Open Env
|
||||
ret = tdbOpen("tdb", pageSize, 64, &pEnv);
|
||||
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
// Create a database
|
||||
compFunc = tKeyCmpr;
|
||||
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
|
||||
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
{
|
||||
|
|
|
@ -130,12 +130,12 @@ TEST(tdb_test, DISABLED_simple_insert1) {
|
|||
taosRemoveDir("tdb");
|
||||
|
||||
// Open Env
|
||||
ret = tdbOpen("tdb", 4096, 64, &pEnv);
|
||||
ret = tdbOpen("tdb", 4096, 64, &pEnv, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
// Create a database
|
||||
compFunc = tKeyCmpr;
|
||||
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
|
||||
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
{
|
||||
|
@ -250,12 +250,12 @@ TEST(tdb_test, DISABLED_simple_insert2) {
|
|||
taosRemoveDir("tdb");
|
||||
|
||||
// Open Env
|
||||
ret = tdbOpen("tdb", 1024, 10, &pEnv);
|
||||
ret = tdbOpen("tdb", 1024, 10, &pEnv, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
// Create a database
|
||||
compFunc = tDefaultKeyCmpr;
|
||||
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
|
||||
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
{
|
||||
|
@ -346,11 +346,11 @@ TEST(tdb_test, DISABLED_simple_delete1) {
|
|||
pPool = openPool();
|
||||
|
||||
// open env
|
||||
ret = tdbOpen("tdb", 1024, 256, &pEnv);
|
||||
ret = tdbOpen("tdb", 1024, 256, &pEnv, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
// open database
|
||||
ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb);
|
||||
ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||
|
@ -435,11 +435,11 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
|
|||
taosRemoveDir("tdb");
|
||||
|
||||
// open env
|
||||
ret = tdbOpen("tdb", 4096, 64, &pEnv);
|
||||
ret = tdbOpen("tdb", 4096, 64, &pEnv, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
// open database
|
||||
ret = tdbTbOpen("db.db", -1, -1, NULL, pEnv, &pDb);
|
||||
ret = tdbTbOpen("db.db", -1, -1, NULL, pEnv, &pDb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
pPool = openPool();
|
||||
|
@ -497,12 +497,12 @@ TEST(tdb_test, multi_thread_query) {
|
|||
taosRemoveDir("tdb");
|
||||
|
||||
// Open Env
|
||||
ret = tdbOpen("tdb", 4096, 10, &pEnv);
|
||||
ret = tdbOpen("tdb", 4096, 10, &pEnv, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
// Create a database
|
||||
compFunc = tKeyCmpr;
|
||||
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
|
||||
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
char key[64];
|
||||
|
@ -614,10 +614,10 @@ TEST(tdb_test, DISABLED_multi_thread1) {
|
|||
taosRemoveDir("tdb");
|
||||
|
||||
// Open Env
|
||||
ret = tdbOpen("tdb", 512, 1, &pDb);
|
||||
ret = tdbOpen("tdb", 512, 1, &pDb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb);
|
||||
ret = tdbTbOpen("db.db", -1, -1, NULL, pDb, &pTb, 0);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
|
||||
auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) {
|
||||
|
@ -726,4 +726,4 @@ TEST(tdb_test, DISABLED_multi_thread1) {
|
|||
ret = tdbClose(pDb);
|
||||
GTEST_ASSERT_EQ(ret, 0);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
|
|
@ -343,6 +343,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
|
|||
|
||||
TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile));
|
||||
if (pFile == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (fd >= 0) close(fd);
|
||||
if (fp != NULL) fclose(fp);
|
||||
return NULL;
|
||||
|
|
|
@ -26,19 +26,25 @@ static void *taosProcessSchedQueue(void *param);
|
|||
static void taosDumpSchedulerStatus(void *qhandle, void *tmrId);
|
||||
|
||||
void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *label, SSchedQueue *pSched) {
|
||||
bool schedMalloced = false;
|
||||
|
||||
if (NULL == pSched) {
|
||||
pSched = (SSchedQueue *)taosMemoryCalloc(sizeof(SSchedQueue), 1);
|
||||
if (pSched == NULL) {
|
||||
uError("%s: no enough memory for pSched", label);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
schedMalloced = true;
|
||||
}
|
||||
|
||||
pSched->queue = (SSchedMsg *)taosMemoryCalloc(sizeof(SSchedMsg), queueSize);
|
||||
if (pSched->queue == NULL) {
|
||||
uError("%s: no enough memory for queue", label);
|
||||
taosCleanUpScheduler(pSched);
|
||||
taosMemoryFree(pSched);
|
||||
if (schedMalloced) {
|
||||
taosMemoryFree(pSched);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -46,6 +52,9 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
|
|||
if (pSched->qthread == NULL) {
|
||||
uError("%s: no enough memory for qthread", label);
|
||||
taosCleanUpScheduler(pSched);
|
||||
if (schedMalloced) {
|
||||
taosMemoryFree(pSched);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -58,18 +67,27 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
|
|||
if (taosThreadMutexInit(&pSched->queueMutex, NULL) < 0) {
|
||||
uError("init %s:queueMutex failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
if (schedMalloced) {
|
||||
taosMemoryFree(pSched);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->emptySem, 0, (uint32_t)pSched->queueSize) != 0) {
|
||||
uError("init %s:empty semaphore failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
if (schedMalloced) {
|
||||
taosMemoryFree(pSched);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tsem_init(&pSched->fullSem, 0, 0) != 0) {
|
||||
uError("init %s:full semaphore failed(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
if (schedMalloced) {
|
||||
taosMemoryFree(pSched);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -83,6 +101,9 @@ void *taosInitScheduler(int32_t queueSize, int32_t numOfThreads, const char *lab
|
|||
if (code != 0) {
|
||||
uError("%s: failed to create rpc thread(%s)", label, strerror(errno));
|
||||
taosCleanUpScheduler(pSched);
|
||||
if (schedMalloced) {
|
||||
taosMemoryFree(pSched);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
++pSched->numOfThreads;
|
||||
|
|
|
@ -313,7 +313,7 @@
|
|||
|
||||
# --- sma
|
||||
./test.sh -f tsim/sma/drop_sma.sim
|
||||
# ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
|
||||
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
|
||||
# temp disable
|
||||
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
|
||||
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
|
||||
|
|
|
@ -232,7 +232,7 @@ python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
|
|||
python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeStopLoop.py -N 5 -M 3
|
||||
# python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateDb.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDb.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateDb.py -N 5 -M 3
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateDbRep3.py -N 5 -M 3
|
||||
|
@ -248,7 +248,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 5 -
|
|||
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 6 -M 3 -C 5
|
||||
# BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py
|
||||
# python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
|
||||
python3 test.py -f 6-cluster/5dnode3mnodeStopConnect.py -N 5 -M 3
|
||||
|
||||
python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 5 -M 3
|
||||
|
|
Loading…
Reference in New Issue