other: merge 3.0

This commit is contained in:
Haojun Liao 2023-05-13 20:16:55 +08:00
commit 170c603995
24 changed files with 298 additions and 753 deletions

View File

@ -102,7 +102,7 @@ sudo apt-get install tdengine
:::tip
This installation method is supported only for Debian and Ubuntu.
::::
:::
</TabItem>
<TabItem label="Windows" value="windows">

View File

@ -77,7 +77,7 @@ description: 一些常见问题的解决方法汇总
- Windows 系统请使用 PowerShell 命令 Test-NetConnection -ComputerName {fqdn} -Port {port} 检测服务段端口是否访问
11. 也可以使用 taos 程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅:[诊断及其他](https://docs.taosdata.com/3.0-preview/operation/diagnose/)。
11. 也可以使用 taos 程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅:[诊断及其他](../../operation/diagnose/)。
### 5. 遇到错误 Unable to resolve FQDN” 怎么办?

View File

@ -232,7 +232,8 @@ DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill);
DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type);
DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type);
/* --------------------------schemaless INTERFACE------------------------------- */

View File

@ -213,143 +213,141 @@
#define TK_REPLACE 195
#define TK_STREAM 196
#define TK_INTO 197
#define TK_PAUSE 198
#define TK_RESUME 199
#define TK_TRIGGER 200
#define TK_AT_ONCE 201
#define TK_WINDOW_CLOSE 202
#define TK_IGNORE 203
#define TK_EXPIRED 204
#define TK_FILL_HISTORY 205
#define TK_UPDATE 206
#define TK_SUBTABLE 207
#define TK_UNTREATED 208
#define TK_KILL 209
#define TK_CONNECTION 210
#define TK_TRANSACTION 211
#define TK_BALANCE 212
#define TK_VGROUP 213
#define TK_LEADER 214
#define TK_MERGE 215
#define TK_REDISTRIBUTE 216
#define TK_SPLIT 217
#define TK_DELETE 218
#define TK_INSERT 219
#define TK_NULL 220
#define TK_NK_QUESTION 221
#define TK_NK_ARROW 222
#define TK_ROWTS 223
#define TK_QSTART 224
#define TK_QEND 225
#define TK_QDURATION 226
#define TK_WSTART 227
#define TK_WEND 228
#define TK_WDURATION 229
#define TK_IROWTS 230
#define TK_ISFILLED 231
#define TK_CAST 232
#define TK_NOW 233
#define TK_TODAY 234
#define TK_TIMEZONE 235
#define TK_CLIENT_VERSION 236
#define TK_SERVER_VERSION 237
#define TK_SERVER_STATUS 238
#define TK_CURRENT_USER 239
#define TK_CASE 240
#define TK_WHEN 241
#define TK_THEN 242
#define TK_ELSE 243
#define TK_BETWEEN 244
#define TK_IS 245
#define TK_NK_LT 246
#define TK_NK_GT 247
#define TK_NK_LE 248
#define TK_NK_GE 249
#define TK_NK_NE 250
#define TK_MATCH 251
#define TK_NMATCH 252
#define TK_CONTAINS 253
#define TK_IN 254
#define TK_JOIN 255
#define TK_INNER 256
#define TK_SELECT 257
#define TK_DISTINCT 258
#define TK_WHERE 259
#define TK_PARTITION 260
#define TK_BY 261
#define TK_SESSION 262
#define TK_STATE_WINDOW 263
#define TK_EVENT_WINDOW 264
#define TK_SLIDING 265
#define TK_FILL 266
#define TK_VALUE 267
#define TK_VALUE_F 268
#define TK_NONE 269
#define TK_PREV 270
#define TK_NULL_F 271
#define TK_LINEAR 272
#define TK_NEXT 273
#define TK_HAVING 274
#define TK_RANGE 275
#define TK_EVERY 276
#define TK_ORDER 277
#define TK_SLIMIT 278
#define TK_SOFFSET 279
#define TK_LIMIT 280
#define TK_OFFSET 281
#define TK_ASC 282
#define TK_NULLS 283
#define TK_ABORT 284
#define TK_AFTER 285
#define TK_ATTACH 286
#define TK_BEFORE 287
#define TK_BEGIN 288
#define TK_BITAND 289
#define TK_BITNOT 290
#define TK_BITOR 291
#define TK_BLOCKS 292
#define TK_CHANGE 293
#define TK_COMMA 294
#define TK_CONCAT 295
#define TK_CONFLICT 296
#define TK_COPY 297
#define TK_DEFERRED 298
#define TK_DELIMITERS 299
#define TK_DETACH 300
#define TK_DIVIDE 301
#define TK_DOT 302
#define TK_EACH 303
#define TK_FAIL 304
#define TK_FILE 305
#define TK_FOR 306
#define TK_GLOB 307
#define TK_ID 308
#define TK_IMMEDIATE 309
#define TK_IMPORT 310
#define TK_INITIALLY 311
#define TK_INSTEAD 312
#define TK_ISNULL 313
#define TK_KEY 314
#define TK_MODULES 315
#define TK_NK_BITNOT 316
#define TK_NK_SEMI 317
#define TK_NOTNULL 318
#define TK_OF 319
#define TK_PLUS 320
#define TK_PRIVILEGE 321
#define TK_RAISE 322
#define TK_RESTRICT 323
#define TK_ROW 324
#define TK_SEMI 325
#define TK_STAR 326
#define TK_STATEMENT 327
#define TK_STRICT 328
#define TK_STRING 329
#define TK_TIMES 330
#define TK_VALUES 331
#define TK_VARIABLE 332
#define TK_VIEW 333
#define TK_WAL 334
#define TK_TRIGGER 198
#define TK_AT_ONCE 199
#define TK_WINDOW_CLOSE 200
#define TK_IGNORE 201
#define TK_EXPIRED 202
#define TK_FILL_HISTORY 203
#define TK_UPDATE 204
#define TK_SUBTABLE 205
#define TK_KILL 206
#define TK_CONNECTION 207
#define TK_TRANSACTION 208
#define TK_BALANCE 209
#define TK_VGROUP 210
#define TK_LEADER 211
#define TK_MERGE 212
#define TK_REDISTRIBUTE 213
#define TK_SPLIT 214
#define TK_DELETE 215
#define TK_INSERT 216
#define TK_NULL 217
#define TK_NK_QUESTION 218
#define TK_NK_ARROW 219
#define TK_ROWTS 220
#define TK_QSTART 221
#define TK_QEND 222
#define TK_QDURATION 223
#define TK_WSTART 224
#define TK_WEND 225
#define TK_WDURATION 226
#define TK_IROWTS 227
#define TK_ISFILLED 228
#define TK_CAST 229
#define TK_NOW 230
#define TK_TODAY 231
#define TK_TIMEZONE 232
#define TK_CLIENT_VERSION 233
#define TK_SERVER_VERSION 234
#define TK_SERVER_STATUS 235
#define TK_CURRENT_USER 236
#define TK_CASE 237
#define TK_WHEN 238
#define TK_THEN 239
#define TK_ELSE 240
#define TK_BETWEEN 241
#define TK_IS 242
#define TK_NK_LT 243
#define TK_NK_GT 244
#define TK_NK_LE 245
#define TK_NK_GE 246
#define TK_NK_NE 247
#define TK_MATCH 248
#define TK_NMATCH 249
#define TK_CONTAINS 250
#define TK_IN 251
#define TK_JOIN 252
#define TK_INNER 253
#define TK_SELECT 254
#define TK_DISTINCT 255
#define TK_WHERE 256
#define TK_PARTITION 257
#define TK_BY 258
#define TK_SESSION 259
#define TK_STATE_WINDOW 260
#define TK_EVENT_WINDOW 261
#define TK_SLIDING 262
#define TK_FILL 263
#define TK_VALUE 264
#define TK_VALUE_F 265
#define TK_NONE 266
#define TK_PREV 267
#define TK_NULL_F 268
#define TK_LINEAR 269
#define TK_NEXT 270
#define TK_HAVING 271
#define TK_RANGE 272
#define TK_EVERY 273
#define TK_ORDER 274
#define TK_SLIMIT 275
#define TK_SOFFSET 276
#define TK_LIMIT 277
#define TK_OFFSET 278
#define TK_ASC 279
#define TK_NULLS 280
#define TK_ABORT 281
#define TK_AFTER 282
#define TK_ATTACH 283
#define TK_BEFORE 284
#define TK_BEGIN 285
#define TK_BITAND 286
#define TK_BITNOT 287
#define TK_BITOR 288
#define TK_BLOCKS 289
#define TK_CHANGE 290
#define TK_COMMA 291
#define TK_CONCAT 292
#define TK_CONFLICT 293
#define TK_COPY 294
#define TK_DEFERRED 295
#define TK_DELIMITERS 296
#define TK_DETACH 297
#define TK_DIVIDE 298
#define TK_DOT 299
#define TK_EACH 300
#define TK_FAIL 301
#define TK_FILE 302
#define TK_FOR 303
#define TK_GLOB 304
#define TK_ID 305
#define TK_IMMEDIATE 306
#define TK_IMPORT 307
#define TK_INITIALLY 308
#define TK_INSTEAD 309
#define TK_ISNULL 310
#define TK_KEY 311
#define TK_MODULES 312
#define TK_NK_BITNOT 313
#define TK_NK_SEMI 314
#define TK_NOTNULL 315
#define TK_OF 316
#define TK_PLUS 317
#define TK_PRIVILEGE 318
#define TK_RAISE 319
#define TK_RESTRICT 320
#define TK_ROW 321
#define TK_SEMI 322
#define TK_STAR 323
#define TK_STATEMENT 324
#define TK_STRICT 325
#define TK_STRING 326
#define TK_TIMES 327
#define TK_VALUES 328
#define TK_VARIABLE 329
#define TK_VIEW 330
#define TK_WAL 331
#define TK_NK_SPACE 600
#define TK_NK_COMMENT 601

View File

@ -132,7 +132,7 @@ typedef struct {
} SWalRef;
typedef struct {
// int8_t scanUncommited;
int8_t scanUncommited;
int8_t scanNotApplied;
int8_t scanMeta;
int8_t enableRef;

View File

@ -1170,6 +1170,7 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
}
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
taosThreadMutexLock(&pTscObj->mutex);
if (pTscObj->passInfo.fp) {
atomic_sub_fetch_32(&pAppHbMgr->passKeyCnt, 1);
@ -1178,4 +1179,6 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
}
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; }
void taos_set_hb_quit(int8_t quitByKill) {
clientHbMgr.quitByKill = quitByKill;
}

View File

@ -679,7 +679,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
SField *field = taosArrayGet(results, j);
len += field->bytes;
}
if (len > maxLen) {
if(len > maxLen){
return isTag ? TSDB_CODE_PAR_INVALID_TAGS_LENGTH : TSDB_CODE_PAR_INVALID_ROW_LENGTH;
}
@ -1586,9 +1586,9 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
do {
code = smlModifyDBSchemas(info);
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS ||
code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH ||
code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) {
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
|| code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH
|| code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) {
break;
}
taosMsleep(100);
@ -1614,7 +1614,7 @@ void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawL
if (tsSlowLogScope & SLOW_LOG_TYPE_INSERT) {
int32_t len = 0;
int32_t rlen = 0;
char *p = NULL;
char* p = NULL;
if (lines && lines[0]) {
len = strlen(lines[0]);

View File

@ -1478,13 +1478,8 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
char buf[80];
<<<<<<< HEAD
tFormatOffset(buf, 80, &pVgCur->currentOffset);
tscDebug("consumer:0x%" PRIx64 ", doUpdateLocalEp current vg, epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, tmq->epoch, pVgCur->vgId,
=======
tFormatOffset(buf, 80, &pVgCur->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
>>>>>>> enh/3.0
vgKey, buf);
SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows};
@ -1693,12 +1688,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
return handleErrorBeforePoll(pVg, pTmq);
}
<<<<<<< HEAD
sendInfo->msgInfo = (SDataBuf){ .pData = msg, .len = msgSize, .handle = NULL };
=======
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
>>>>>>> enh/3.0
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
@ -1816,16 +1806,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset;
}
<<<<<<< HEAD
if(pDataRsp->rspOffset.type != 0){ // if offset is validate
pVg->currentOffset = pDataRsp->rspOffset; // update the local offset value only for the returned values.
}
=======
// update the local offset value only for the returned values.
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
// update the status
>>>>>>> enh/3.0
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// update the valid wal version range
@ -1869,13 +1853,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
<<<<<<< HEAD
if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate
pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
}
=======
pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
>>>>>>> enh/3.0
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
@ -1893,13 +1873,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle;
<<<<<<< HEAD
if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate
pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset;
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
}
=======
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
>>>>>>> enh/3.0
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->taosxRsp.blockNum == 0) {

View File

@ -1080,7 +1080,6 @@ TEST(clientCase, sub_tb_test) {
ASSERT_NE(pConn, nullptr);
tmq_conf_t* conf = tmq_conf_new();
<<<<<<< HEAD
int32_t ts = taosGetTimestampMs()%INT32_MAX;
char consumerGroupid[128] = {0};
@ -1089,11 +1088,6 @@ TEST(clientCase, sub_tb_test) {
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", consumerGroupid);
=======
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName1024");
>>>>>>> enh/3.0
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
@ -1137,13 +1131,9 @@ TEST(clientCase, sub_tb_test) {
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
<<<<<<< HEAD
if (pRes) {
char buf[128];
=======
if (pRes != NULL) {
>>>>>>> enh/3.0
const char* topicName = tmq_get_topic_name(pRes);
// const char* dbName = tmq_get_db_name(pRes);
// int32_t vgroupId = tmq_get_vgroup_id(pRes);
@ -1152,26 +1142,7 @@ TEST(clientCase, sub_tb_test) {
// printf("db: %s\n", dbName);
// printf("vgroup id: %d\n", vgroupId);
<<<<<<< HEAD
while (1) {
TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) {
break;
}
fields = taos_fetch_fields(pRes);
numOfFields = taos_field_count(pRes);
totalRows += 1;
// if (totalRows % 100000 == 0) {
taos_print_row(buf, row, fields, numOfFields);
printf("row content: %s\n", buf);
// }
}
taos_free_result(pRes);
=======
printSubResults(pRes, &totalRows);
>>>>>>> enh/3.0
} else {
// tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgroupHandle, pAssign[0].begin);
// break;
@ -1180,6 +1151,11 @@ TEST(clientCase, sub_tb_test) {
tmq_commit_sync(tmq, pRes);
if (pRes != NULL) {
taos_free_result(pRes);
// if ((++count) > 1) {
// break;
// }
} else {
break;
}
tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, pAssign[0].begin);

View File

@ -212,7 +212,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
"learnerReplica:%d learnerSelfIndex:%d strict:%d",
req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
(uint64_t)req.buffer * 1024 * 1024,
req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid,
req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression,
@ -224,7 +224,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
req.replicas[i].id);
}
for (int32_t i = 0; i < req.learnerReplica; ++i) {
dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn,
req.learnerReplicas[i].port, req.replicas[i].id);
}
@ -278,7 +278,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
}
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
tFreeSCreateVnodeReq(&req);
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
@ -326,7 +326,7 @@ _OVER:
vnodeClose(pImpl);
vnodeDestroy(path, pMgmt->pTfs);
} else {
dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created",
dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created",
req.vgId, TMSG_INFO(pMsg->msgType));
}
@ -346,7 +346,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
req.learnerSelfIndex = -1;
}
dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request",
dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request",
req.vgId, TMSG_INFO(pMsg->msgType));
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
@ -386,8 +386,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
}
if (req.replica <= 0 ||
(req.selfIndex < 0 && req.learnerSelfIndex <0)||
if (req.replica <= 0 ||
(req.selfIndex < 0 && req.learnerSelfIndex <0)||
req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) {
terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", vgId);
@ -402,7 +402,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
else{
pReplica = &req.learnerReplicas[req.learnerSelfIndex];
}
if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
@ -447,7 +447,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered",
req.vgId, TMSG_INFO(pMsg->msgType));
return 0;
}
@ -552,7 +552,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vgId = alterReq.vgId;
dInfo("vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d",
"learnerSelfIndex:%d strict:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
for (int32_t i = 0; i < alterReq.replica; ++i) {
@ -563,9 +563,9 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SReplica *pReplica = &alterReq.learnerReplicas[i];
dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
}
if (alterReq.replica <= 0 ||
(alterReq.selfIndex < 0 && alterReq.learnerSelfIndex <0)||
if (alterReq.replica <= 0 ||
(alterReq.selfIndex < 0 && alterReq.learnerSelfIndex <0)||
alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", vgId);
@ -579,7 +579,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
else{
pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
}
if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
strcmp(pReplica->fqdn, tsLocalFqdn) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
@ -631,7 +631,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
dInfo("vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d",
"learnerSelfIndex:%d strict:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
return 0;
@ -711,12 +711,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK_TO_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
<<<<<<< HEAD
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
=======
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
>>>>>>> enh/3.0
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -816,6 +816,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
SMqConsumerObj *pConsumer = NULL;
void *buf = NULL;
terrno = 0;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
goto CM_DECODE_OVER;

View File

@ -298,7 +298,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
}
if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){
createReq.replica++;
createReq.replica++;
}
else{
createReq.learnerReplica++;
@ -310,14 +310,14 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
return NULL;
}
mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica,
createReq.learnerReplica, createReq.strict);
for (int32_t i = 0; i < createReq.replica; ++i) {
mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port);
}
for (int32_t i = 0; i < createReq.learnerReplica; ++i) {
mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn,
createReq.learnerReplicas[i].port);
}
@ -397,13 +397,13 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){
pReplica = &alterReq.replicas[alterReq.replica];
alterReq.replica++;
alterReq.replica++;
}
else{
pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica];
alterReq.learnerReplica++;
}
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) return NULL;
@ -425,14 +425,14 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p
}
}
mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d",
alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict);
for (int32_t i = 0; i < alterReq.replica; ++i) {
mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port);
}
for (int32_t i = 0; i < alterReq.learnerReplica; ++i) {
mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i,
mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i,
alterReq.learnerReplicas[i].fqdn, alterReq.learnerReplicas[i].port);
}
@ -1296,7 +1296,7 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
return 0;
}
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
SDnodeObj *pDnode) {
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
@ -2023,7 +2023,7 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra
}
else
{
mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d",
mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d",
pTrans->id, vgid, dnodeId, exist, online);
}
@ -2158,7 +2158,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
return 0;
}
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup,
int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup,
SDnodeObj *pDnode) {
SVgObj newVgroup = {0};
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
@ -2169,7 +2169,7 @@ int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
if(newVgroup.replica == 1){
int selected = 0;
for(int i = 0; i < newVgroup.replica; i++){
newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
if(newVgroup.vnodeGid[i].dnodeId == pDnode->id){
selected = i;
}
@ -2183,12 +2183,12 @@ int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
}
else{
newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
}
}
}
if (mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode) != 0) return -1;
for(int i = 0; i < newVgroup.replica; i++){
newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
if(newVgroup.vnodeGid[i].dnodeId == pDnode->id){
}
}
@ -2211,8 +2211,6 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
return 0;
}
<<<<<<< HEAD
=======
static int32_t mndTransCommitVgStatus(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus) {
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
if (pRaw == NULL) goto _err;
@ -2225,7 +2223,6 @@ _err:
return -1;
}
>>>>>>> enh/3.0
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
int32_t code = -1;
STrans *pTrans = NULL;
@ -2534,11 +2531,11 @@ _OVER:
bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; }
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) {
for(int i = 0; i < pVgroup->replica; i++){
if(pVgroup->vnodeGid[i].dnodeId == dnodeId) return true;
}
return false;
return false;
}
static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs,

View File

@ -105,13 +105,6 @@ typedef struct {
int8_t exec;
} STqHandle;
typedef struct {
SMqDataRsp* pDataRsp;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
SRpcHandleInfo info;
STqHandle* pHandle;
} STqPushEntry;
struct STQ {
SVnode* pVnode;
char* path;
@ -149,14 +142,9 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
<<<<<<< HEAD
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type);
int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle);
=======
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId);
int32_t tqPushDataRsp(STqPushEntry* pPushEntry, int32_t vgId);
>>>>>>> enh/3.0
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type,
int32_t vgId);
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId);
// tqMeta
int32_t tqMetaOpen(STQ* pTq);
@ -193,16 +181,9 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
<<<<<<< HEAD
=======
int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq);
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver);
void saveOffsetForAllTasks(STQ* pTq, int64_t ver);
void initOffsetForAllRestoreTasks(STQ* pTq);
>>>>>>> enh/3.0
bool tqIsHandleExecuting(STqHandle* pHandle);
#ifdef __cplusplus
}
#endif

View File

@ -195,16 +195,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
void tqNotifyClose(STQ*);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
<<<<<<< HEAD
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
=======
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
int32_t type);
int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
>>>>>>> enh/3.0
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);

View File

@ -71,21 +71,10 @@ static void destroyTqHandle(void* data) {
walCloseReader(pData->pWalReader);
tqCloseReader(pData->execHandle.pTqReader);
}
<<<<<<< HEAD
if(pData->msg != NULL) {
rpcFreeCont(pData->msg->pCont);
taosMemoryFree(pData->msg);
pData->msg = NULL;
=======
}
static void tqPushEntryFree(void* data) {
STqPushEntry* p = *(void**)data;
if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
tDeleteMqDataRsp(p->pDataRsp);
} else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
>>>>>>> enh/3.0
}
}
@ -162,7 +151,6 @@ void tqClose(STQ* pTq) {
taosMemoryFree(pTq);
}
<<<<<<< HEAD
void tqNotifyClose(STQ* pTq) {
if (pTq != NULL) {
taosWLockLatch(&pTq->pStreamMeta->lock);
@ -194,7 +182,7 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData
int32_t code = 0;
if (type == TMQ_MSG_TYPE__POLL_RSP) {
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
}
@ -219,7 +207,7 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData
tEncoderInit(&encoder, abuf, len);
if (type == TMQ_MSG_TYPE__POLL_RSP) {
tEncodeSMqDataRsp(&encoder, pRsp);
tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
}
@ -237,33 +225,22 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData
return 0;
}
int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) {
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
SMqDataRsp dataRsp = {0};
dataRsp.head.consumerId = pHandle->consumerId;
dataRsp.head.epoch = pHandle->epoch;
dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP);
=======
int32_t tqPushDataRsp(STqPushEntry* pPushEntry, int32_t vgId) {
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pPushEntry->pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
tqDoSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType, sver, ever);
>>>>>>> enh/3.0
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever);
char buf1[80] = {0};
char buf2[80] = {0};
tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
<<<<<<< HEAD
TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
=======
vgId, pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
>>>>>>> enh/3.0
vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
return 0;
}
@ -462,7 +439,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != consumerId) {
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
consumerId, vgId, req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosRUnLockLatch(&pTq->lock);
return -1;
@ -521,6 +498,8 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req);
@ -545,8 +524,10 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
dataRsp.rspOffset.version = sver;
if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = currentVer; // return current consume offset value
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
dataRsp.rspOffset.version = ever;
} else {
@ -564,8 +545,9 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
int32_t code = 0;
// taosWLockLatch(&pTq->lock);
// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
@ -580,6 +562,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
}
while (tqIsHandleExecuting(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (code != 0) {
tqError("cannot process tq delete req %s, since no such handle", pReq->subKey);
@ -940,11 +928,9 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
}
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t code;
#if 0
code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
if (code < 0) return code;
#endif
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
if (tsDisableStream) {
return 0;
}
@ -970,7 +956,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
taosWLockLatch(&pTq->pStreamMeta->lock);
code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask);
if (code < 0) {
tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr,
tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr,
streamMetaGetNumOfTasks(pTq->pStreamMeta));
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1;
@ -983,8 +969,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
streamTaskCheckDownstream(pTask, sversion);
}
tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", TD_VID(pTq->pVnode),
pTask->id.idStr, pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta));
tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr,
pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta));
return 0;
}

View File

@ -16,261 +16,6 @@
#include "tq.h"
#include "vnd.h"
<<<<<<< HEAD
=======
#if 0
void tqTmrRspFunc(void* param, void* tmrId) {
STqHandle* pHandle = (STqHandle*)param;
atomic_store_8(&pHandle->pushHandle.tmrStopped, 1);
}
static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) {
SStreamDataSubmit* pSubmit = *ppSubmit;
while (pSubmit != NULL) {
if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) {
}
// update processed
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitDestroy(pSubmit);
if (pRsp->blockNum > 0) {
*ppSubmit = pSubmit;
return 0;
} else {
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
}
}
*ppSubmit = pSubmit;
return -1;
}
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
SMqDataRsp rsp = {0};
// 1. guard and set status executing
int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE,
TASK_EXEC_STATUS__EXECUTING);
if (execStatus == TASK_EXEC_STATUS__IDLE) {
SStreamDataSubmit* pSubmit = NULL;
// 2. check processedVer
// 2.1. if not missed, get msg from queue
// 2.2. if missed, scan wal
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
while (pHandle->pushHandle.processedVer <= pSubmit->ver) {
// read from wal
}
while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) {
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitDestroy(pSubmit);
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
if (pSubmit == NULL) break;
}
// 3. exec, after each success, update processed ver
// first run
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
goto SEND_RSP;
}
// set exec status closing
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING);
// second run
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
goto SEND_RSP;
}
// set exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
}
SEND_RSP:
// 4. if get result
// 4.1 set exec input status blocked and exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
// 4.2 rpc send
rsp.rspOffset = pHandle->pushHandle.processedVer;
/*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/
/*return -1;*/
/*}*/
// 4.3 clear rpc info
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
return 0;
}
int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) {
memset(&pHandle->pushHandle, 0, sizeof(STqPushHandle));
pHandle->pushHandle.inputQ.queue = taosOpenQueue();
pHandle->pushHandle.inputQ.qall = taosAllocateQall();
if (pHandle->pushHandle.inputQ.queue == NULL || pHandle->pushHandle.inputQ.qall == NULL) {
if (pHandle->pushHandle.inputQ.queue) {
taosCloseQueue(pHandle->pushHandle.inputQ.queue);
}
if (pHandle->pushHandle.inputQ.qall) {
taosFreeQall(pHandle->pushHandle.inputQ.qall);
}
return -1;
}
return 0;
}
int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer,
int64_t timeout) {
memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo));
atomic_store_64(&pHandle->pushHandle.reqId, reqId);
atomic_store_64(&pHandle->pushHandle.processedVer, processedVer);
atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL);
atomic_store_8(&pHandle->pushHandle.tmrStopped, 0);
taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId);
return 0;
}
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit);
if (pSubmitClone == NULL) {
return -1;
}
taosWriteQitem(pHandle->pushHandle.inputQ.queue, pSubmitClone);
return 0;
}
return -1;
}
int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) {
//
return 0;
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL;
STqHandle* pHandle = NULL;
SSubmitReq* pReq = (SSubmitReq*)msg;
int32_t workerId = 4;
int64_t fetchOffset = ver;
while (1) {
pIter = taosHashIterate(pTq->pushMgr, pIter);
if (pIter == NULL) break;
pHandle = *(STqHandle**)pIter;
taosWLockLatch(&pHandle->pushHandle.lock);
SMqDataRsp rsp = {0};
rsp.reqOffset = pHandle->pushHandle.reqOffset;
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
if (msgType == TDMT_VND_SUBMIT) {
tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId);
} else {
tqError("tq push unexpected msg type %d", msgType);
}
if (rsp.blockNum == 0) {
taosWUnLockLatch(&pHandle->pushHandle.lock);
continue;
}
rsp.rspOffset = fetchOffset;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
// todo free
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch;
((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp);
SRpcMsg resp = {
.info = pHandle->pushHandle.rpcInfo,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
taosWUnLockLatch(&pHandle->pushHandle.lock);
tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64,
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
rsp.reqOffset, rsp.rspOffset);
// TODO destroy
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
}
return 0;
}
#endif
typedef struct {
void* pKey;
int64_t keyLen;
} SItem;
static void recordPushedEntry(SArray* cachedKey, void* pIter);
static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq);
static void freeItem(void* param) {
SItem* p = (SItem*)param;
taosMemoryFree(p->pKey);
}
static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData,
int32_t dataLen, SArray* pCachedKey) {
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
if (pRsp->reqOffset.version >= ver) {
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
pRsp->reqOffset.version, ver);
return;
}
qTaskInfo_t pTaskInfo = pExec->task;
// prepare scan mem data
SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver};
if (qStreamSetScanMemData(pTaskInfo, submit) != 0) {
return;
}
qStreamSetOpen(pTaskInfo);
// here start to scan submit block to extract the subscribed data
int32_t totalRows = 0;
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) {
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
}
if (pDataBlock == NULL) {
break;
}
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision);
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
}
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum,
totalRows);
if (pRsp->blockNum > 0) {
tqOffsetResetToLog(&pRsp->rspOffset, ver);
tqPushDataRsp(pPushEntry, vgId);
recordPushedEntry(pCachedKey, pIter);
}
}
>>>>>>> enh/3.0
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
if (msgType == TDMT_VND_SUBMIT) {
@ -310,42 +55,17 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
} else {
tqPushDataRsp(pTq, pHandle);
tqPushDataRsp(pHandle, vgId);
void* tmp = pHandle->msg->pCont;
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = tmp;
}
<<<<<<< HEAD
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
pHandle->msg->contLen = pMsg->contLen;
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
=======
pPushEntry->pHandle = pHandle;
pPushEntry->info = pRpcMsg->info;
memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp));
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp));
} else if (type == TMQ_MSG_TYPE__POLL_RSP) {
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp));
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp));
}
SMqRspHead* pHead = &pPushEntry->pDataRsp->head;
pHead->consumerId = consumerId;
pHead->epoch = pRequest->epoch;
pHead->mqMsgType = type;
taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*));
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d",
consumerId, pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr));
>>>>>>> enh/3.0
return 0;
}
@ -357,19 +77,11 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
if(pHandle->msg != NULL) {
tqPushDataRsp(pTq, pHandle);
tqPushDataRsp(pHandle, vgId);
<<<<<<< HEAD
rpcFreeCont(pHandle->msg->pCont);
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
=======
if (rspConsumer) { // rsp the old consumer with empty block.
tqPushDataRsp(*pEntry, vgId);
}
taosHashRemove(pTq->pPushMgr, pKey, keyLen);
>>>>>>> enh/3.0
}
return 0;

View File

@ -41,80 +41,7 @@ int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueI
return TSDB_CODE_SUCCESS;
}
<<<<<<< HEAD
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
=======
void initOffsetForAllRestoreTasks(STQ* pTq) {
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, since not ready, status %d", pTask->id.idStr, pTask->status.taskStatus);
continue;
}
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version);
}
}
}
void saveOffsetForAllTasks(STQ* pTq, int64_t ver) {
void* pIter = NULL;
while(1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr,
pTask->status.taskStatus);
continue;
}
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, ver);
}
}
}
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) {
STqOffset offset = {0};
tqOffsetResetToLog(&offset.val, ver);
tstrncpy(offset.subKey, pKey, tListLen(offset.subKey));
// keep the offset info in the offset store
tqOffsetWrite(pOffsetStore, &offset);
}
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
>>>>>>> enh/3.0
pRsp->reqOffset = pReq->reqOffset;
pRsp->blockData = taosArrayInit(0, sizeof(void*));
@ -235,60 +162,56 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
}
static bool isHandleExecuting(STqHandle* pHandle){
return 1 == atomic_load_8(&pHandle->exec);
}
bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); }
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
char buf[80] = {0};
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
int code = 0;
SMqDataRsp dataRsp = {0};
<<<<<<< HEAD
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
qTaskInfo_t task = pHandle->execHandle.task;
if(qTaskIsExecuting(task)){
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
tqInitDataRsp(&dataRsp, pRequest);
// qTaskInfo_t task = pHandle->execHandle.task;
// if (qTaskIsExecuting(task)) {
// code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
// tDeleteMqDataRsp(&dataRsp);
// return code;
// }
while(isHandleExecuting(pHandle)){
tqInfo("sub is executing, pHandle:%p", pHandle);
// todo add more status check to avoid race condition
while (tqIsHandleExecuting(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
atomic_store_8(&pHandle->exec, 1);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if(code != 0) {
if (code != 0) {
goto end;
}
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data
if (pHandle->noDataPollCnt >= NO_POLL_CNT) { // send poll result to client if no data 5 times to avoid lost data
pHandle->noDataPollCnt = 0;
// lock
taosWLockLatch(&pTq->lock);
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
tDeleteSMqDataRsp(&dataRsp);
tDeleteMqDataRsp(&dataRsp);
atomic_store_8(&pHandle->exec, 0);
return code;
}
else{
} else {
pHandle->noDataPollCnt++;
}
}
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already.
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
end:
{
@ -298,8 +221,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
tDeleteMqDataRsp(&dataRsp);
}
atomic_store_8(&pHandle->exec, 0);
atomic_store_8(&pHandle->exec, 0);
return code;
}
@ -310,17 +233,18 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
qTaskInfo_t task = pHandle->execHandle.task;
if(qTaskIsExecuting(task)){
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
// qTaskInfo_t task = pHandle->execHandle.task;
// if(qTaskIsExecuting(task)){
// code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
// tDeleteSTaosxRsp(&taosxRsp);
// return code;
// }
while(isHandleExecuting(pHandle)){
tqInfo("sub is executing, pHandle:%p", pHandle);
while (tqIsHandleExecuting(pHandle)) {
tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey);
taosMsleep(5);
}
atomic_store_8(&pHandle->exec, 1);
if (offset->type != TMQ_OFFSET__LOG) {
@ -340,7 +264,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
}else {
*offset = taosxRsp.rspOffset;
@ -356,6 +280,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
code = -1;
goto end;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
int totalRows = 0;
while (1) {
@ -369,7 +294,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
}
@ -381,7 +306,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) {
if(totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
}
@ -390,7 +315,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
goto end;
}
@ -409,7 +334,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
} else {
fetchVer++;

View File

@ -2325,7 +2325,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs);
do {
@ -2493,7 +2493,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs);
do {

View File

@ -302,12 +302,12 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) {
return rowsNum;
}
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) {
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj* pTableMap, int64_t *rowsNum) {
taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i];
while (pTbData) {
void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
void* p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
if (p == NULL) {
pTbData = pTbData->next;
continue;

View File

@ -250,6 +250,7 @@ typedef struct STagScanInfo {
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SLimitNode* pSlimit;
SReadHandle readHandle;
STableListInfo* pTableListInfo;
} STagScanInfo;

View File

@ -68,8 +68,8 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=false;make -j || exit 1"
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j || exit 1"
# -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
echo "delete ${WORKDIR}/debugNoSan"
@ -97,11 +97,7 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
<<<<<<< HEAD
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=false -DPORTABLE=true -DWITH_GFLAGS=false;make -j || exit 1 "
=======
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j || exit 1 "
>>>>>>> enh/3.0
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan

View File

@ -657,33 +657,36 @@ if $data20 != null then
return -1
endi
print =============== error for normal table
sql create table tb2023(ts timestamp, f int);
sql_error alter table tb2023 add column v varchar(16375);
sql_error alter table tb2023 add column v varchar(16385);
sql_error alter table tb2023 add column v varchar(33100);
sql alter table tb2023 add column v varchar(16374);
sql_error alter table tb2023 modify column v varchar(16375);
sql desc tb2023
sql alter table tb2023 drop column v
sql_error alter table tb2023 add column v nchar(4094);
sql alter table tb2023 add column v nchar(4093);
sql_error alter table tb2023 modify column v nchar(4094);
sql desc tb2023
print =============== error for super table
sql create table stb2023(ts timestamp, f int) tags(t1 int);
sql_error alter table stb2023 add column v varchar(16375);
sql_error alter table stb2023 add column v varchar(16385);
sql_error alter table stb2023 add column v varchar(33100);
sql alter table stb2023 add column v varchar(16374);
sql_error alter table stb2023 modify column v varchar(16375);
sql desc stb2023
sql alter table stb2023 drop column v
sql_error alter table stb2023 add column v nchar(4094);
sql alter table stb2023 add column v nchar(4093);
sql_error alter table stb2023 modify column v nchar(4094);
sql desc stb2023
#print =============== error for normal table
#sql create table tb2023(ts timestamp, f int);
#sql_error alter table tb2023 add column v varchar(65535);
#sql_error alter table tb2023 add column v varchar(65535);
#sql_error alter table tb2023 add column v varchar(65530);
#sql alter table tb2023 add column v varchar(16374);
#sql_error alter table tb2023 modify column v varchar(65536);
#sql desc tb2023
#sql alter table tb2023 drop column v
#sql_error alter table tb2023 add column v nchar(16384);
#sql alter table tb2023 add column v nchar(4093);
#sql_error alter table tb2023 modify column v nchar(16384);
#sql_error alter table tb2023 add column v nchar(16384);
#sql alter table tb2023 drop column v
#sql alter table tb2023 add column v nchar(16374);
#sql desc tb2023
#
#print =============== error for super table
#sql create table stb2023(ts timestamp, f int) tags(t1 int);
#sql_error alter table stb2023 add column v varchar(65535);
#sql_error alter table stb2023 add column v varchar(65536);
#sql_error alter table stb2023 add column v varchar(33100);
#sql alter table stb2023 add column v varchar(16374);
#sql_error alter table stb2023 modify column v varchar(16375);
#sql desc stb2023
#sql alter table stb2023 drop column v
#sql_error alter table stb2023 add column v nchar(4094);
#sql alter table stb2023 add column v nchar(4093);
#sql_error alter table stb2023 modify column v nchar(4094);
#sql desc stb2023
print ======= over
sql drop database d1

View File

@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10);
sql_error alter table tb modify column c2 binary(9);
sql_error alter table tb modify column c2 binary(-9);
sql_error alter table tb modify column c2 binary(0);
sql_error alter table tb modify column c2 binary(17000);
sql_error alter table tb modify column c2 binary(65600);
sql_error alter table tb modify column c2 nchar(30);
sql_error alter table tb modify column c3 double;
sql_error alter table tb modify column c3 nchar(10);

View File

@ -989,7 +989,7 @@ int sml_ts2164_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes =
taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_test BUFFER 384 MINROWS 1000 PAGES 256 PRECISION 'ns'");
taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_test MINROWS 1000 PRECISION 'ns'");
taos_free_result(pRes);
const char *sql[] = {
@ -1139,8 +1139,8 @@ int sml_td23881_Test() {
taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_23881 PRECISION 'ns'");
taos_free_result(pRes);
char tmp[16375] = {0};
memset(tmp, 'a', 16374);
char tmp[26375] = {0};
memset(tmp, 'a', 26374);
char sql[102400] = {0};
sprintf(sql,"lujixfvqor,t0=t c0=f,c1=\"%s\",c2=\"%s\",c3=\"%s\",c4=\"wthvqxcsrlps\" 1626006833639000000", tmp, tmp, tmp);
@ -1385,8 +1385,8 @@ int main(int argc, char *argv[]) {
ASSERT(!ret);
ret = sml_ts3116_Test();
ASSERT(!ret);
ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file
ASSERT(!ret);
// ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file
// ASSERT(!ret);
ret = sml_ts3303_Test(); // this test case need config sml table name using ./sml_test config_file
ASSERT(!ret);