diff --git a/docs/en/05-get-started/03-package.md b/docs/en/05-get-started/03-package.md index 8f9cb9aedc..b47855103c 100644 --- a/docs/en/05-get-started/03-package.md +++ b/docs/en/05-get-started/03-package.md @@ -102,7 +102,7 @@ sudo apt-get install tdengine :::tip This installation method is supported only for Debian and Ubuntu. -:::: +::: diff --git a/docs/zh/27-train-faq/01-faq.md b/docs/zh/27-train-faq/01-faq.md index b000619630..9e82ea0af0 100644 --- a/docs/zh/27-train-faq/01-faq.md +++ b/docs/zh/27-train-faq/01-faq.md @@ -77,7 +77,7 @@ description: 一些常见问题的解决方法汇总 - Windows 系统请使用 PowerShell 命令 Test-NetConnection -ComputerName {fqdn} -Port {port} 检测服务段端口是否访问 -11. 也可以使用 taos 程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅:[诊断及其他](https://docs.taosdata.com/3.0-preview/operation/diagnose/)。 +11. 也可以使用 taos 程序内嵌的网络连通检测功能,来验证服务器和客户端之间指定的端口连接是否通畅:[诊断及其他](../../operation/diagnose/)。 ### 5. 遇到错误 Unable to resolve FQDN” 怎么办? diff --git a/include/client/taos.h b/include/client/taos.h index 79aab247ab..d9fd1ca1b8 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -232,7 +232,8 @@ DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill); -DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type); + +DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type); /* --------------------------schemaless INTERFACE------------------------------- */ diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index 363d95e1f7..95c548977c 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -213,143 +213,141 @@ #define TK_REPLACE 195 #define TK_STREAM 196 #define TK_INTO 197 -#define TK_PAUSE 198 -#define TK_RESUME 199 -#define TK_TRIGGER 200 -#define TK_AT_ONCE 201 -#define TK_WINDOW_CLOSE 202 -#define TK_IGNORE 203 -#define TK_EXPIRED 204 -#define TK_FILL_HISTORY 205 -#define TK_UPDATE 206 -#define TK_SUBTABLE 207 -#define TK_UNTREATED 208 -#define TK_KILL 209 -#define TK_CONNECTION 210 -#define TK_TRANSACTION 211 -#define TK_BALANCE 212 -#define TK_VGROUP 213 -#define TK_LEADER 214 -#define TK_MERGE 215 -#define TK_REDISTRIBUTE 216 -#define TK_SPLIT 217 -#define TK_DELETE 218 -#define TK_INSERT 219 -#define TK_NULL 220 -#define TK_NK_QUESTION 221 -#define TK_NK_ARROW 222 -#define TK_ROWTS 223 -#define TK_QSTART 224 -#define TK_QEND 225 -#define TK_QDURATION 226 -#define TK_WSTART 227 -#define TK_WEND 228 -#define TK_WDURATION 229 -#define TK_IROWTS 230 -#define TK_ISFILLED 231 -#define TK_CAST 232 -#define TK_NOW 233 -#define TK_TODAY 234 -#define TK_TIMEZONE 235 -#define TK_CLIENT_VERSION 236 -#define TK_SERVER_VERSION 237 -#define TK_SERVER_STATUS 238 -#define TK_CURRENT_USER 239 -#define TK_CASE 240 -#define TK_WHEN 241 -#define TK_THEN 242 -#define TK_ELSE 243 -#define TK_BETWEEN 244 -#define TK_IS 245 -#define TK_NK_LT 246 -#define TK_NK_GT 247 -#define TK_NK_LE 248 -#define TK_NK_GE 249 -#define TK_NK_NE 250 -#define TK_MATCH 251 -#define TK_NMATCH 252 -#define TK_CONTAINS 253 -#define TK_IN 254 -#define TK_JOIN 255 -#define TK_INNER 256 -#define TK_SELECT 257 -#define TK_DISTINCT 258 -#define TK_WHERE 259 -#define TK_PARTITION 260 -#define TK_BY 261 -#define TK_SESSION 262 -#define TK_STATE_WINDOW 263 -#define TK_EVENT_WINDOW 264 -#define TK_SLIDING 265 -#define TK_FILL 266 -#define TK_VALUE 267 -#define TK_VALUE_F 268 -#define TK_NONE 269 -#define TK_PREV 270 -#define TK_NULL_F 271 -#define TK_LINEAR 272 -#define TK_NEXT 273 -#define TK_HAVING 274 -#define TK_RANGE 275 -#define TK_EVERY 276 -#define TK_ORDER 277 -#define TK_SLIMIT 278 -#define TK_SOFFSET 279 -#define TK_LIMIT 280 -#define TK_OFFSET 281 -#define TK_ASC 282 -#define TK_NULLS 283 -#define TK_ABORT 284 -#define TK_AFTER 285 -#define TK_ATTACH 286 -#define TK_BEFORE 287 -#define TK_BEGIN 288 -#define TK_BITAND 289 -#define TK_BITNOT 290 -#define TK_BITOR 291 -#define TK_BLOCKS 292 -#define TK_CHANGE 293 -#define TK_COMMA 294 -#define TK_CONCAT 295 -#define TK_CONFLICT 296 -#define TK_COPY 297 -#define TK_DEFERRED 298 -#define TK_DELIMITERS 299 -#define TK_DETACH 300 -#define TK_DIVIDE 301 -#define TK_DOT 302 -#define TK_EACH 303 -#define TK_FAIL 304 -#define TK_FILE 305 -#define TK_FOR 306 -#define TK_GLOB 307 -#define TK_ID 308 -#define TK_IMMEDIATE 309 -#define TK_IMPORT 310 -#define TK_INITIALLY 311 -#define TK_INSTEAD 312 -#define TK_ISNULL 313 -#define TK_KEY 314 -#define TK_MODULES 315 -#define TK_NK_BITNOT 316 -#define TK_NK_SEMI 317 -#define TK_NOTNULL 318 -#define TK_OF 319 -#define TK_PLUS 320 -#define TK_PRIVILEGE 321 -#define TK_RAISE 322 -#define TK_RESTRICT 323 -#define TK_ROW 324 -#define TK_SEMI 325 -#define TK_STAR 326 -#define TK_STATEMENT 327 -#define TK_STRICT 328 -#define TK_STRING 329 -#define TK_TIMES 330 -#define TK_VALUES 331 -#define TK_VARIABLE 332 -#define TK_VIEW 333 -#define TK_WAL 334 +#define TK_TRIGGER 198 +#define TK_AT_ONCE 199 +#define TK_WINDOW_CLOSE 200 +#define TK_IGNORE 201 +#define TK_EXPIRED 202 +#define TK_FILL_HISTORY 203 +#define TK_UPDATE 204 +#define TK_SUBTABLE 205 +#define TK_KILL 206 +#define TK_CONNECTION 207 +#define TK_TRANSACTION 208 +#define TK_BALANCE 209 +#define TK_VGROUP 210 +#define TK_LEADER 211 +#define TK_MERGE 212 +#define TK_REDISTRIBUTE 213 +#define TK_SPLIT 214 +#define TK_DELETE 215 +#define TK_INSERT 216 +#define TK_NULL 217 +#define TK_NK_QUESTION 218 +#define TK_NK_ARROW 219 +#define TK_ROWTS 220 +#define TK_QSTART 221 +#define TK_QEND 222 +#define TK_QDURATION 223 +#define TK_WSTART 224 +#define TK_WEND 225 +#define TK_WDURATION 226 +#define TK_IROWTS 227 +#define TK_ISFILLED 228 +#define TK_CAST 229 +#define TK_NOW 230 +#define TK_TODAY 231 +#define TK_TIMEZONE 232 +#define TK_CLIENT_VERSION 233 +#define TK_SERVER_VERSION 234 +#define TK_SERVER_STATUS 235 +#define TK_CURRENT_USER 236 +#define TK_CASE 237 +#define TK_WHEN 238 +#define TK_THEN 239 +#define TK_ELSE 240 +#define TK_BETWEEN 241 +#define TK_IS 242 +#define TK_NK_LT 243 +#define TK_NK_GT 244 +#define TK_NK_LE 245 +#define TK_NK_GE 246 +#define TK_NK_NE 247 +#define TK_MATCH 248 +#define TK_NMATCH 249 +#define TK_CONTAINS 250 +#define TK_IN 251 +#define TK_JOIN 252 +#define TK_INNER 253 +#define TK_SELECT 254 +#define TK_DISTINCT 255 +#define TK_WHERE 256 +#define TK_PARTITION 257 +#define TK_BY 258 +#define TK_SESSION 259 +#define TK_STATE_WINDOW 260 +#define TK_EVENT_WINDOW 261 +#define TK_SLIDING 262 +#define TK_FILL 263 +#define TK_VALUE 264 +#define TK_VALUE_F 265 +#define TK_NONE 266 +#define TK_PREV 267 +#define TK_NULL_F 268 +#define TK_LINEAR 269 +#define TK_NEXT 270 +#define TK_HAVING 271 +#define TK_RANGE 272 +#define TK_EVERY 273 +#define TK_ORDER 274 +#define TK_SLIMIT 275 +#define TK_SOFFSET 276 +#define TK_LIMIT 277 +#define TK_OFFSET 278 +#define TK_ASC 279 +#define TK_NULLS 280 +#define TK_ABORT 281 +#define TK_AFTER 282 +#define TK_ATTACH 283 +#define TK_BEFORE 284 +#define TK_BEGIN 285 +#define TK_BITAND 286 +#define TK_BITNOT 287 +#define TK_BITOR 288 +#define TK_BLOCKS 289 +#define TK_CHANGE 290 +#define TK_COMMA 291 +#define TK_CONCAT 292 +#define TK_CONFLICT 293 +#define TK_COPY 294 +#define TK_DEFERRED 295 +#define TK_DELIMITERS 296 +#define TK_DETACH 297 +#define TK_DIVIDE 298 +#define TK_DOT 299 +#define TK_EACH 300 +#define TK_FAIL 301 +#define TK_FILE 302 +#define TK_FOR 303 +#define TK_GLOB 304 +#define TK_ID 305 +#define TK_IMMEDIATE 306 +#define TK_IMPORT 307 +#define TK_INITIALLY 308 +#define TK_INSTEAD 309 +#define TK_ISNULL 310 +#define TK_KEY 311 +#define TK_MODULES 312 +#define TK_NK_BITNOT 313 +#define TK_NK_SEMI 314 +#define TK_NOTNULL 315 +#define TK_OF 316 +#define TK_PLUS 317 +#define TK_PRIVILEGE 318 +#define TK_RAISE 319 +#define TK_RESTRICT 320 +#define TK_ROW 321 +#define TK_SEMI 322 +#define TK_STAR 323 +#define TK_STATEMENT 324 +#define TK_STRICT 325 +#define TK_STRING 326 +#define TK_TIMES 327 +#define TK_VALUES 328 +#define TK_VARIABLE 329 +#define TK_VIEW 330 +#define TK_WAL 331 + #define TK_NK_SPACE 600 #define TK_NK_COMMENT 601 diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index f203d5ef77..f737499293 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -132,7 +132,7 @@ typedef struct { } SWalRef; typedef struct { - // int8_t scanUncommited; + int8_t scanUncommited; int8_t scanNotApplied; int8_t scanMeta; int8_t enableRef; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 78092b65a8..10c42bb67d 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -1170,6 +1170,7 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { } atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); + taosThreadMutexLock(&pTscObj->mutex); if (pTscObj->passInfo.fp) { atomic_sub_fetch_32(&pAppHbMgr->passKeyCnt, 1); @@ -1178,4 +1179,6 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { } // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner -void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; } +void taos_set_hb_quit(int8_t quitByKill) { + clientHbMgr.quitByKill = quitByKill; +} diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 443c276cd1..6170b0a056 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -679,7 +679,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO SField *field = taosArrayGet(results, j); len += field->bytes; } - if (len > maxLen) { + if(len > maxLen){ return isTag ? TSDB_CODE_PAR_INVALID_TAGS_LENGTH : TSDB_CODE_PAR_INVALID_ROW_LENGTH; } @@ -1586,9 +1586,9 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL do { code = smlModifyDBSchemas(info); - if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS || - code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH || - code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) { + if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS + || code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH + || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) { break; } taosMsleep(100); @@ -1614,7 +1614,7 @@ void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawL if (tsSlowLogScope & SLOW_LOG_TYPE_INSERT) { int32_t len = 0; int32_t rlen = 0; - char *p = NULL; + char* p = NULL; if (lines && lines[0]) { len = strlen(lines[0]); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4f5edd8a59..87aee4a8a3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1478,13 +1478,8 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); char buf[80]; -<<<<<<< HEAD - tFormatOffset(buf, 80, &pVgCur->currentOffset); - tscDebug("consumer:0x%" PRIx64 ", doUpdateLocalEp current vg, epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, tmq->epoch, pVgCur->vgId, -======= tFormatOffset(buf, 80, &pVgCur->offsetInfo.currentOffset); tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, ->>>>>>> enh/3.0 vgKey, buf); SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows}; @@ -1693,12 +1688,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p return handleErrorBeforePoll(pVg, pTmq); } -<<<<<<< HEAD - sendInfo->msgInfo = (SDataBuf){ .pData = msg, .len = msgSize, .handle = NULL }; - -======= sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; ->>>>>>> enh/3.0 sendInfo->requestId = req.reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; @@ -1816,16 +1806,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } -<<<<<<< HEAD - if(pDataRsp->rspOffset.type != 0){ // if offset is validate - pVg->currentOffset = pDataRsp->rspOffset; // update the local offset value only for the returned values. - } -======= // update the local offset value only for the returned values. pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; // update the status ->>>>>>> enh/3.0 atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // update the valid wal version range @@ -1869,13 +1853,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; -<<<<<<< HEAD if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate - pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; + pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset; } -======= - pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset; ->>>>>>> enh/3.0 atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); @@ -1893,13 +1873,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; -<<<<<<< HEAD if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate - pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; + pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; } -======= - pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; ->>>>>>> enh/3.0 atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->taosxRsp.blockNum == 0) { diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 6bb3b321d2..b04727bfc0 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1080,7 +1080,6 @@ TEST(clientCase, sub_tb_test) { ASSERT_NE(pConn, nullptr); tmq_conf_t* conf = tmq_conf_new(); -<<<<<<< HEAD int32_t ts = taosGetTimestampMs()%INT32_MAX; char consumerGroupid[128] = {0}; @@ -1089,11 +1088,6 @@ TEST(clientCase, sub_tb_test) { tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); tmq_conf_set(conf, "group.id", consumerGroupid); -======= - tmq_conf_set(conf, "enable.auto.commit", "false"); - tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - tmq_conf_set(conf, "group.id", "cgrpName1024"); ->>>>>>> enh/3.0 tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); @@ -1137,13 +1131,9 @@ TEST(clientCase, sub_tb_test) { while (1) { TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); -<<<<<<< HEAD if (pRes) { char buf[128]; -======= - if (pRes != NULL) { ->>>>>>> enh/3.0 const char* topicName = tmq_get_topic_name(pRes); // const char* dbName = tmq_get_db_name(pRes); // int32_t vgroupId = tmq_get_vgroup_id(pRes); @@ -1152,26 +1142,7 @@ TEST(clientCase, sub_tb_test) { // printf("db: %s\n", dbName); // printf("vgroup id: %d\n", vgroupId); -<<<<<<< HEAD - while (1) { - TAOS_ROW row = taos_fetch_row(pRes); - if (row == NULL) { - break; - } - - fields = taos_fetch_fields(pRes); - numOfFields = taos_field_count(pRes); - totalRows += 1; -// if (totalRows % 100000 == 0) { - taos_print_row(buf, row, fields, numOfFields); - printf("row content: %s\n", buf); -// } - } - - taos_free_result(pRes); -======= printSubResults(pRes, &totalRows); ->>>>>>> enh/3.0 } else { // tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgroupHandle, pAssign[0].begin); // break; @@ -1180,6 +1151,11 @@ TEST(clientCase, sub_tb_test) { tmq_commit_sync(tmq, pRes); if (pRes != NULL) { taos_free_result(pRes); + // if ((++count) > 1) { + // break; + // } + } else { + break; } tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, pAssign[0].begin); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 61d76c3f4e..814a155cfb 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -212,7 +212,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64 ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d " "learnerReplica:%d learnerSelfIndex:%d strict:%d", - req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, + req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression, @@ -224,7 +224,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { req.replicas[i].id); } for (int32_t i = 0; i < req.learnerReplica; ++i) { - dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn, + dInfo("vgId:%d, learnerReplica:%d ep:%s:%u dnode:%d", req.vgId, i, req.learnerReplicas[i].fqdn, req.learnerReplicas[i].port, req.replicas[i].id); } @@ -278,7 +278,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } } - + if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { tFreeSCreateVnodeReq(&req); dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr()); @@ -326,7 +326,7 @@ _OVER: vnodeClose(pImpl); vnodeDestroy(path, pMgmt->pTfs); } else { - dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", + dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId, TMSG_INFO(pMsg->msgType)); } @@ -346,7 +346,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { req.learnerSelfIndex = -1; } - dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", + dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId, TMSG_INFO(pMsg->msgType)); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); @@ -386,8 +386,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id); } - if (req.replica <= 0 || - (req.selfIndex < 0 && req.learnerSelfIndex <0)|| + if (req.replica <= 0 || + (req.selfIndex < 0 && req.learnerSelfIndex <0)|| req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) { terrno = TSDB_CODE_INVALID_MSG; dError("vgId:%d, failed to alter replica since invalid msg", vgId); @@ -402,7 +402,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { else{ pReplica = &req.learnerReplicas[req.learnerSelfIndex]; } - + if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort || strcmp(pReplica->fqdn, tsLocalFqdn) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -447,7 +447,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } - dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered", + dInfo("vgId:%d, vnode management handle msgType:%s, end to process alter-node-type-request, vnode config is altered", req.vgId, TMSG_INFO(pMsg->msgType)); return 0; } @@ -552,7 +552,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vgId = alterReq.vgId; dInfo("vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d " - "learnerSelfIndex:%d strict:%d", + "learnerSelfIndex:%d strict:%d", vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex, alterReq.strict); for (int32_t i = 0; i < alterReq.replica; ++i) { @@ -563,9 +563,9 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SReplica *pReplica = &alterReq.learnerReplicas[i]; dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port); } - - if (alterReq.replica <= 0 || - (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex <0)|| + + if (alterReq.replica <= 0 || + (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex <0)|| alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) { terrno = TSDB_CODE_INVALID_MSG; dError("vgId:%d, failed to alter replica since invalid msg", vgId); @@ -579,7 +579,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { else{ pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex]; } - + if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort || strcmp(pReplica->fqdn, tsLocalFqdn) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -631,7 +631,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } dInfo("vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d " - "learnerSelfIndex:%d strict:%d", + "learnerSelfIndex:%d strict:%d", vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex, alterReq.strict); return 0; @@ -711,12 +711,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK_TO_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; -<<<<<<< HEAD if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; -======= - if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; ->>>>>>> enh/3.0 if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index da4a42ccce..117c1082a5 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -816,6 +816,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { SMqConsumerObj *pConsumer = NULL; void *buf = NULL; + terrno = 0; int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) { goto CM_DECODE_OVER; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index a834b164e2..a8e9db28e9 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -298,7 +298,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg } if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){ - createReq.replica++; + createReq.replica++; } else{ createReq.learnerReplica++; @@ -310,14 +310,14 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg return NULL; } - mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d", + mInfo("vgId:%d, build create vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d", createReq.vgId, createReq.replica, createReq.selfIndex, createReq.learnerReplica, createReq.learnerReplica, createReq.strict); for (int32_t i = 0; i < createReq.replica; ++i) { mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.replicas[i].fqdn, createReq.replicas[i].port); } for (int32_t i = 0; i < createReq.learnerReplica; ++i) { - mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn, + mInfo("vgId:%d, replica:%d ep:%s:%u", createReq.vgId, i, createReq.learnerReplicas[i].fqdn, createReq.learnerReplicas[i].port); } @@ -397,13 +397,13 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p if(pVgroup->vnodeGid[v].nodeRole == TAOS_SYNC_ROLE_VOTER){ pReplica = &alterReq.replicas[alterReq.replica]; - alterReq.replica++; + alterReq.replica++; } else{ pReplica = &alterReq.learnerReplicas[alterReq.learnerReplica]; alterReq.learnerReplica++; } - + SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pVgidDnode == NULL) return NULL; @@ -425,14 +425,14 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p } } - mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d", - alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, + mInfo("vgId:%d, build alter vnode req, replica:%d selfIndex:%d learnerReplica:%d learnerSelfIndex:%d strict:%d", + alterReq.vgId, alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, alterReq.learnerSelfIndex, alterReq.strict); for (int32_t i = 0; i < alterReq.replica; ++i) { mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port); } for (int32_t i = 0; i < alterReq.learnerReplica; ++i) { - mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, + mInfo("vgId:%d, learnerReplica:%d ep:%s:%u", alterReq.vgId, i, alterReq.learnerReplicas[i].fqdn, alterReq.learnerReplicas[i].port); } @@ -1296,7 +1296,7 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, return 0; } -int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, +int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pDnode) { STransAction action = {0}; action.epSet = mndGetDnodeEpset(pDnode); @@ -2023,7 +2023,7 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra } else { - mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", + mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist, online); } @@ -2158,7 +2158,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb return 0; } -int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, +int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, SDnodeObj *pDnode) { SVgObj newVgroup = {0}; memcpy(&newVgroup, pVgroup, sizeof(SVgObj)); @@ -2169,7 +2169,7 @@ int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj if(newVgroup.replica == 1){ int selected = 0; for(int i = 0; i < newVgroup.replica; i++){ - newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER; + newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER; if(newVgroup.vnodeGid[i].dnodeId == pDnode->id){ selected = i; } @@ -2183,12 +2183,12 @@ int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj } else{ newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER; - } + } } if (mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode) != 0) return -1; for(int i = 0; i < newVgroup.replica; i++){ - newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER; + newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER; if(newVgroup.vnodeGid[i].dnodeId == pDnode->id){ } } @@ -2211,8 +2211,6 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, return 0; } -<<<<<<< HEAD -======= static int32_t mndTransCommitVgStatus(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus) { SSdbRaw *pRaw = mndVgroupActionEncode(pVg); if (pRaw == NULL) goto _err; @@ -2225,7 +2223,6 @@ _err: return -1; } ->>>>>>> enh/3.0 int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { int32_t code = -1; STrans *pTrans = NULL; @@ -2534,11 +2531,11 @@ _OVER: bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } -bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) { +bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) { for(int i = 0; i < pVgroup->replica; i++){ if(pVgroup->vnodeGid[i].dnodeId == dnodeId) return true; } - return false; + return false; } static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs, diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index e9811a56f1..edaf72c41f 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -105,13 +105,6 @@ typedef struct { int8_t exec; } STqHandle; -typedef struct { - SMqDataRsp* pDataRsp; - char subKey[TSDB_SUBSCRIBE_KEY_LEN]; - SRpcHandleInfo info; - STqHandle* pHandle; -} STqPushEntry; - struct STQ { SVnode* pVnode; char* path; @@ -149,14 +142,9 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); -<<<<<<< HEAD -int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); -int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle); -======= -int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, - int32_t type, int32_t vgId); -int32_t tqPushDataRsp(STqPushEntry* pPushEntry, int32_t vgId); ->>>>>>> enh/3.0 +int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, + int32_t vgId); +int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId); // tqMeta int32_t tqMetaOpen(STQ* pTq); @@ -193,16 +181,9 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, - int32_t type, int64_t sver, int64_t ever); - -<<<<<<< HEAD -======= + int32_t type, int64_t sver, int64_t ever); int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq); -void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver); -void saveOffsetForAllTasks(STQ* pTq, int64_t ver); -void initOffsetForAllRestoreTasks(STQ* pTq); - ->>>>>>> enh/3.0 +bool tqIsHandleExecuting(STqHandle* pHandle); #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 24c8c5efa7..d7f0ef041a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -195,16 +195,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqNotifyClose(STQ*); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); -<<<<<<< HEAD int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. -======= -int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type); -int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); -int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. ->>>>>>> enh/3.0 int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f065d3ea1e..f6f2b3ec53 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -71,21 +71,10 @@ static void destroyTqHandle(void* data) { walCloseReader(pData->pWalReader); tqCloseReader(pData->execHandle.pTqReader); } -<<<<<<< HEAD if(pData->msg != NULL) { rpcFreeCont(pData->msg->pCont); taosMemoryFree(pData->msg); pData->msg = NULL; -======= -} - -static void tqPushEntryFree(void* data) { - STqPushEntry* p = *(void**)data; - if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - tDeleteMqDataRsp(p->pDataRsp); - } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) { - tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp); ->>>>>>> enh/3.0 } } @@ -162,7 +151,6 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq); } -<<<<<<< HEAD void tqNotifyClose(STQ* pTq) { if (pTq != NULL) { taosWLockLatch(&pTq->pStreamMeta->lock); @@ -194,7 +182,7 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData int32_t code = 0; if (type == TMQ_MSG_TYPE__POLL_RSP) { - tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); + tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); } @@ -219,7 +207,7 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData tEncoderInit(&encoder, abuf, len); if (type == TMQ_MSG_TYPE__POLL_RSP) { - tEncodeSMqDataRsp(&encoder, pRsp); + tEncodeMqDataRsp(&encoder, pRsp); } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); } @@ -237,33 +225,22 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData return 0; } -int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) { +int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { SMqDataRsp dataRsp = {0}; dataRsp.head.consumerId = pHandle->consumerId; dataRsp.head.epoch = pHandle->epoch; dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP); -======= -int32_t tqPushDataRsp(STqPushEntry* pPushEntry, int32_t vgId) { - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; int64_t sver = 0, ever = 0; - walReaderValidVersionRange(pPushEntry->pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); - - tqDoSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType, sver, ever); ->>>>>>> enh/3.0 + walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); + tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever); char buf1[80] = {0}; char buf2[80] = {0}; tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", -<<<<<<< HEAD - TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); -======= - vgId, pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); ->>>>>>> enh/3.0 + vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); return 0; } @@ -462,7 +439,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, - consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); + consumerId, vgId, req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; taosRUnLockLatch(&pTq->lock); return -1; @@ -521,6 +498,8 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); + int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); + SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, &req); @@ -545,8 +524,10 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { dataRsp.rspOffset.type = TMQ_OFFSET__LOG; - if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { - dataRsp.rspOffset.version = sver; + if (reqOffset.type == TMQ_OFFSET__LOG) { + dataRsp.rspOffset.version = currentVer; // return current consume offset value + } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { + dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { dataRsp.rspOffset.version = ever; } else { @@ -564,8 +545,9 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; + int32_t vgId = TD_VID(pTq->pVnode); - tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); + tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); int32_t code = 0; // taosWLockLatch(&pTq->lock); // int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); @@ -580,6 +562,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pHandle->pRef) { walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); } + + while (tqIsHandleExecuting(pHandle)) { + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); + taosMsleep(5); + } + code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); @@ -940,11 +928,9 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 } int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - int32_t code; -#if 0 - code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen); - if (code < 0) return code; -#endif + int32_t code = 0; + int32_t vgId = TD_VID(pTq->pVnode); + if (tsDisableStream) { return 0; } @@ -970,7 +956,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms taosWLockLatch(&pTq->pStreamMeta->lock); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); if (code < 0) { - tqError("vgId:%d failed to add s-task:%s, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, + tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, streamMetaGetNumOfTasks(pTq->pStreamMeta)); taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; @@ -983,8 +969,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms streamTaskCheckDownstream(pTask, sversion); } - tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", TD_VID(pTq->pVnode), - pTask->id.idStr, pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); + tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr, + pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); return 0; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index ba8bbed27e..e1e9bec348 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -16,261 +16,6 @@ #include "tq.h" #include "vnd.h" -<<<<<<< HEAD -======= -#if 0 -void tqTmrRspFunc(void* param, void* tmrId) { - STqHandle* pHandle = (STqHandle*)param; - atomic_store_8(&pHandle->pushHandle.tmrStopped, 1); -} - -static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) { - SStreamDataSubmit* pSubmit = *ppSubmit; - while (pSubmit != NULL) { - if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) { - } - // update processed - atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); - streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); - streamDataSubmitDestroy(pSubmit); - if (pRsp->blockNum > 0) { - *ppSubmit = pSubmit; - return 0; - } else { - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - } - } - *ppSubmit = pSubmit; - return -1; -} - -int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { - SMqDataRsp rsp = {0}; - // 1. guard and set status executing - int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE, - TASK_EXEC_STATUS__EXECUTING); - if (execStatus == TASK_EXEC_STATUS__IDLE) { - SStreamDataSubmit* pSubmit = NULL; - // 2. check processedVer - // 2.1. if not missed, get msg from queue - // 2.2. if missed, scan wal - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - while (pHandle->pushHandle.processedVer <= pSubmit->ver) { - // read from wal - } - while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) { - streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); - streamDataSubmitDestroy(pSubmit); - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - if (pSubmit == NULL) break; - } - // 3. exec, after each success, update processed ver - // first run - if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) { - goto SEND_RSP; - } - // set exec status closing - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING); - // second run - if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) { - goto SEND_RSP; - } - // set exec status idle - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE); - } -SEND_RSP: - // 4. if get result - // 4.1 set exec input status blocked and exec status idle - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE); - // 4.2 rpc send - rsp.rspOffset = pHandle->pushHandle.processedVer; - /*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/ - /*return -1;*/ - /*}*/ - // 4.3 clear rpc info - memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); - return 0; -} - -int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) { - memset(&pHandle->pushHandle, 0, sizeof(STqPushHandle)); - pHandle->pushHandle.inputQ.queue = taosOpenQueue(); - pHandle->pushHandle.inputQ.qall = taosAllocateQall(); - if (pHandle->pushHandle.inputQ.queue == NULL || pHandle->pushHandle.inputQ.qall == NULL) { - if (pHandle->pushHandle.inputQ.queue) { - taosCloseQueue(pHandle->pushHandle.inputQ.queue); - } - if (pHandle->pushHandle.inputQ.qall) { - taosFreeQall(pHandle->pushHandle.inputQ.qall); - } - return -1; - } - return 0; -} - -int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer, - int64_t timeout) { - memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo)); - atomic_store_64(&pHandle->pushHandle.reqId, reqId); - atomic_store_64(&pHandle->pushHandle.processedVer, processedVer); - atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL); - atomic_store_8(&pHandle->pushHandle.tmrStopped, 0); - taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId); - return 0; -} - -int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) { - int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus); - if (inputStatus == TASK_INPUT_STATUS__NORMAL) { - SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit); - if (pSubmitClone == NULL) { - return -1; - } - taosWriteQitem(pHandle->pushHandle.inputQ.queue, pSubmitClone); - return 0; - } - return -1; -} - -int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) { - // - return 0; -} - -int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) { - if (msgType != TDMT_VND_SUBMIT) return 0; - void* pIter = NULL; - STqHandle* pHandle = NULL; - SSubmitReq* pReq = (SSubmitReq*)msg; - int32_t workerId = 4; - int64_t fetchOffset = ver; - - while (1) { - pIter = taosHashIterate(pTq->pushMgr, pIter); - if (pIter == NULL) break; - pHandle = *(STqHandle**)pIter; - - taosWLockLatch(&pHandle->pushHandle.lock); - - SMqDataRsp rsp = {0}; - rsp.reqOffset = pHandle->pushHandle.reqOffset; - rsp.blockData = taosArrayInit(0, sizeof(void*)); - rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); - - if (msgType == TDMT_VND_SUBMIT) { - tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId); - } else { - tqError("tq push unexpected msg type %d", msgType); - } - - if (rsp.blockNum == 0) { - taosWUnLockLatch(&pHandle->pushHandle.lock); - continue; - } - - rsp.rspOffset = fetchOffset; - - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - // todo free - return -1; - } - - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - ((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch; - ((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId; - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqDataBlkRsp(&abuf, &rsp); - - SRpcMsg resp = { - .info = pHandle->pushHandle.rpcInfo, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - tmsgSendRsp(&resp); - - memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); - taosWUnLockLatch(&pHandle->pushHandle.lock); - - tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64, - TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, - rsp.reqOffset, rsp.rspOffset); - - // TODO destroy - taosArrayDestroy(rsp.blockData); - taosArrayDestroy(rsp.blockDataLen); - } - - return 0; -} -#endif - -typedef struct { - void* pKey; - int64_t keyLen; -} SItem; - -static void recordPushedEntry(SArray* cachedKey, void* pIter); -static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq); - -static void freeItem(void* param) { - SItem* p = (SItem*)param; - taosMemoryFree(p->pKey); -} - -static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData, - int32_t dataLen, SArray* pCachedKey) { - STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; - - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - if (pRsp->reqOffset.version >= ver) { - tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId, - pRsp->reqOffset.version, ver); - return; - } - - qTaskInfo_t pTaskInfo = pExec->task; - - // prepare scan mem data - SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver}; - - if (qStreamSetScanMemData(pTaskInfo, submit) != 0) { - return; - } - qStreamSetOpen(pTaskInfo); - // here start to scan submit block to extract the subscribed data - int32_t totalRows = 0; - - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) { - tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr()); - } - - if (pDataBlock == NULL) { - break; - } - - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); - pRsp->blockNum++; - totalRows += pDataBlock->info.rows; - } - - tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum, - totalRows); - - if (pRsp->blockNum > 0) { - tqOffsetResetToLog(&pRsp->rspOffset, ver); - tqPushDataRsp(pPushEntry, vgId); - recordPushedEntry(pCachedKey, pIter); - } -} - ->>>>>>> enh/3.0 int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { if (msgType == TDMT_VND_SUBMIT) { @@ -310,42 +55,17 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); } else { - tqPushDataRsp(pTq, pHandle); + tqPushDataRsp(pHandle, vgId); void* tmp = pHandle->msg->pCont; memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = tmp; } -<<<<<<< HEAD memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); -======= - pPushEntry->pHandle = pHandle; - pPushEntry->info = pRpcMsg->info; - memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); - - if (type == TMQ_MSG_TYPE__TAOSX_RSP) { - pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp)); - memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp)); - } else if (type == TMQ_MSG_TYPE__POLL_RSP) { - pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp)); - memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp)); - } - - SMqRspHead* pHead = &pPushEntry->pDataRsp->head; - - pHead->consumerId = consumerId; - pHead->epoch = pRequest->epoch; - pHead->mqMsgType = type; - - taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*)); - - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", - consumerId, pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr)); ->>>>>>> enh/3.0 return 0; } @@ -357,19 +77,11 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); if(pHandle->msg != NULL) { - tqPushDataRsp(pTq, pHandle); + tqPushDataRsp(pHandle, vgId); -<<<<<<< HEAD rpcFreeCont(pHandle->msg->pCont); taosMemoryFree(pHandle->msg); pHandle->msg = NULL; -======= - if (rspConsumer) { // rsp the old consumer with empty block. - tqPushDataRsp(*pEntry, vgId); - } - - taosHashRemove(pTq->pPushMgr, pKey, keyLen); ->>>>>>> enh/3.0 } return 0; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index a48cef73cf..885eb65160 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -41,80 +41,7 @@ int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueI return TSDB_CODE_SUCCESS; } -<<<<<<< HEAD -static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) { -======= -void initOffsetForAllRestoreTasks(STQ* pTq) { - void* pIter = NULL; - - while(1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, since not ready, status %d", pTask->id.idStr, pTask->status.taskStatus); - continue; - } - - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, pTask->chkInfo.version); - } - } -} - -void saveOffsetForAllTasks(STQ* pTq, int64_t ver) { - void* pIter = NULL; - - while(1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, - pTask->status.taskStatus); - continue; - } - - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, ver); - } - } -} - -void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) { - STqOffset offset = {0}; - tqOffsetResetToLog(&offset.val, ver); - - tstrncpy(offset.subKey, pKey, tListLen(offset.subKey)); - - // keep the offset info in the offset store - tqOffsetWrite(pOffsetStore, &offset); -} - int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) { ->>>>>>> enh/3.0 pRsp->reqOffset = pReq->reqOffset; pRsp->blockData = taosArrayInit(0, sizeof(void*)); @@ -235,60 +162,56 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } -static bool isHandleExecuting(STqHandle* pHandle){ - return 1 == atomic_load_8(&pHandle->exec); -} +bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); } static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { - char buf[80] = {0}; uint64_t consumerId = pRequest->consumerId; int32_t vgId = TD_VID(pTq->pVnode); int code = 0; SMqDataRsp dataRsp = {0}; -<<<<<<< HEAD - tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); - qTaskInfo_t task = pHandle->execHandle.task; - if(qTaskIsExecuting(task)){ - code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); - tDeleteSMqDataRsp(&dataRsp); - return code; - } + tqInitDataRsp(&dataRsp, pRequest); +// qTaskInfo_t task = pHandle->execHandle.task; +// if (qTaskIsExecuting(task)) { +// code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); +// tDeleteMqDataRsp(&dataRsp); +// return code; +// } - while(isHandleExecuting(pHandle)){ - tqInfo("sub is executing, pHandle:%p", pHandle); + // todo add more status check to avoid race condition + while (tqIsHandleExecuting(pHandle)) { + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); taosMsleep(5); } + atomic_store_8(&pHandle->exec, 1); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); - if(code != 0) { + if (code != 0) { goto end; } // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - if(pHandle->noDataPollCnt >= NO_POLL_CNT){ // send poll result to client if no data 5 times to avoid lost data + if (pHandle->noDataPollCnt >= NO_POLL_CNT) { // send poll result to client if no data 5 times to avoid lost data pHandle->noDataPollCnt = 0; // lock taosWLockLatch(&pTq->lock); code = tqRegisterPushHandle(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); - tDeleteSMqDataRsp(&dataRsp); + tDeleteMqDataRsp(&dataRsp); atomic_store_8(&pHandle->exec, 0); return code; - } - else{ + } else { pHandle->noDataPollCnt++; } } - - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); // NOTE: this pHandle->consumerId may have been changed already. + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); end: { @@ -298,8 +221,8 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); tDeleteMqDataRsp(&dataRsp); } - atomic_store_8(&pHandle->exec, 0); + atomic_store_8(&pHandle->exec, 0); return code; } @@ -310,17 +233,18 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); - qTaskInfo_t task = pHandle->execHandle.task; - if(qTaskIsExecuting(task)){ - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } +// qTaskInfo_t task = pHandle->execHandle.task; +// if(qTaskIsExecuting(task)){ +// code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); +// tDeleteSTaosxRsp(&taosxRsp); +// return code; +// } - while(isHandleExecuting(pHandle)){ - tqInfo("sub is executing, pHandle:%p", pHandle); + while (tqIsHandleExecuting(pHandle)) { + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); taosMsleep(5); } + atomic_store_8(&pHandle->exec, 1); if (offset->type != TMQ_OFFSET__LOG) { @@ -340,7 +264,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 ",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts); if (taosxRsp.blockNum > 0) { - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; }else { *offset = taosxRsp.rspOffset; @@ -356,6 +280,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, code = -1; goto end; } + walSetReaderCapacity(pHandle->pWalReader, 2048); int totalRows = 0; while (1) { @@ -369,7 +294,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } @@ -381,7 +306,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (pHead->msgType != TDMT_VND_SUBMIT) { if(totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } @@ -390,7 +315,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; - code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); + code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId); goto end; } @@ -409,7 +334,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } else { fetchVer++; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index e0e837dc58..c0a8de5743 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -2325,7 +2325,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC TSKEY lastRowTs = TSKEY_MAX; CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader, + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader, &pr->pDataFReaderLast, pr->lastTs); do { @@ -2493,7 +2493,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, TSKEY lastRowTs = TSKEY_MAX; CacheNextRowIter iter = {0}; - nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader, + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader, &pr->pDataFReaderLast, pr->lastTs); do { diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index cd511e0d23..97b648201c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -302,12 +302,12 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) { return rowsNum; } -void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum) { +void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj* pTableMap, int64_t *rowsNum) { taosRLockLatch(&pMemTable->latch); for (int32_t i = 0; i < pMemTable->nBucket; ++i) { STbData *pTbData = pMemTable->aBucket[i]; while (pTbData) { - void *p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); + void* p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid)); if (p == NULL) { pTbData = pTbData->next; continue; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c9d0eebe2b..3f365c7048 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -250,6 +250,7 @@ typedef struct STagScanInfo { SSDataBlock* pRes; SColMatchInfo matchInfo; int32_t curPos; + SLimitNode* pSlimit; SReadHandle readHandle; STableListInfo* pTableListInfo; } STagScanInfo; diff --git a/tests/parallel_test/container_build.sh b/tests/parallel_test/container_build.sh index 8a68e5f76d..0fc29c241b 100755 --- a/tests/parallel_test/container_build.sh +++ b/tests/parallel_test/container_build.sh @@ -68,8 +68,8 @@ docker run \ -v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ - -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ - --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=false;make -j || exit 1" + --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j || exit 1" + # -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ if [[ -d ${WORKDIR}/debugNoSan ]] ;then echo "delete ${WORKDIR}/debugNoSan" @@ -97,11 +97,7 @@ docker run \ -v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \ -<<<<<<< HEAD - --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=false -DPORTABLE=true -DWITH_GFLAGS=false;make -j || exit 1 " -======= --rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j || exit 1 " ->>>>>>> enh/3.0 mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim index 5f45b446ca..db2a22205f 100644 --- a/tests/script/tsim/alter/table.sim +++ b/tests/script/tsim/alter/table.sim @@ -657,33 +657,36 @@ if $data20 != null then return -1 endi -print =============== error for normal table -sql create table tb2023(ts timestamp, f int); -sql_error alter table tb2023 add column v varchar(16375); -sql_error alter table tb2023 add column v varchar(16385); -sql_error alter table tb2023 add column v varchar(33100); -sql alter table tb2023 add column v varchar(16374); -sql_error alter table tb2023 modify column v varchar(16375); -sql desc tb2023 -sql alter table tb2023 drop column v -sql_error alter table tb2023 add column v nchar(4094); -sql alter table tb2023 add column v nchar(4093); -sql_error alter table tb2023 modify column v nchar(4094); -sql desc tb2023 - -print =============== error for super table -sql create table stb2023(ts timestamp, f int) tags(t1 int); -sql_error alter table stb2023 add column v varchar(16375); -sql_error alter table stb2023 add column v varchar(16385); -sql_error alter table stb2023 add column v varchar(33100); -sql alter table stb2023 add column v varchar(16374); -sql_error alter table stb2023 modify column v varchar(16375); -sql desc stb2023 -sql alter table stb2023 drop column v -sql_error alter table stb2023 add column v nchar(4094); -sql alter table stb2023 add column v nchar(4093); -sql_error alter table stb2023 modify column v nchar(4094); -sql desc stb2023 +#print =============== error for normal table +#sql create table tb2023(ts timestamp, f int); +#sql_error alter table tb2023 add column v varchar(65535); +#sql_error alter table tb2023 add column v varchar(65535); +#sql_error alter table tb2023 add column v varchar(65530); +#sql alter table tb2023 add column v varchar(16374); +#sql_error alter table tb2023 modify column v varchar(65536); +#sql desc tb2023 +#sql alter table tb2023 drop column v +#sql_error alter table tb2023 add column v nchar(16384); +#sql alter table tb2023 add column v nchar(4093); +#sql_error alter table tb2023 modify column v nchar(16384); +#sql_error alter table tb2023 add column v nchar(16384); +#sql alter table tb2023 drop column v +#sql alter table tb2023 add column v nchar(16374); +#sql desc tb2023 +# +#print =============== error for super table +#sql create table stb2023(ts timestamp, f int) tags(t1 int); +#sql_error alter table stb2023 add column v varchar(65535); +#sql_error alter table stb2023 add column v varchar(65536); +#sql_error alter table stb2023 add column v varchar(33100); +#sql alter table stb2023 add column v varchar(16374); +#sql_error alter table stb2023 modify column v varchar(16375); +#sql desc stb2023 +#sql alter table stb2023 drop column v +#sql_error alter table stb2023 add column v nchar(4094); +#sql alter table stb2023 add column v nchar(4093); +#sql_error alter table stb2023 modify column v nchar(4094); +#sql desc stb2023 print ======= over sql drop database d1 diff --git a/tests/script/tsim/parser/alter_column.sim b/tests/script/tsim/parser/alter_column.sim index d569e47735..f892115735 100644 --- a/tests/script/tsim/parser/alter_column.sim +++ b/tests/script/tsim/parser/alter_column.sim @@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10); sql_error alter table tb modify column c2 binary(9); sql_error alter table tb modify column c2 binary(-9); sql_error alter table tb modify column c2 binary(0); -sql_error alter table tb modify column c2 binary(17000); +sql_error alter table tb modify column c2 binary(65600); sql_error alter table tb modify column c2 nchar(30); sql_error alter table tb modify column c3 double; sql_error alter table tb modify column c3 nchar(10); diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index f1dc8ebe79..ac5aff4727 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -989,7 +989,7 @@ int sml_ts2164_Test() { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS_RES *pRes = - taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_test BUFFER 384 MINROWS 1000 PAGES 256 PRECISION 'ns'"); + taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_test MINROWS 1000 PRECISION 'ns'"); taos_free_result(pRes); const char *sql[] = { @@ -1139,8 +1139,8 @@ int sml_td23881_Test() { taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_23881 PRECISION 'ns'"); taos_free_result(pRes); - char tmp[16375] = {0}; - memset(tmp, 'a', 16374); + char tmp[26375] = {0}; + memset(tmp, 'a', 26374); char sql[102400] = {0}; sprintf(sql,"lujixfvqor,t0=t c0=f,c1=\"%s\",c2=\"%s\",c3=\"%s\",c4=\"wthvqxcsrlps\" 1626006833639000000", tmp, tmp, tmp); @@ -1385,8 +1385,8 @@ int main(int argc, char *argv[]) { ASSERT(!ret); ret = sml_ts3116_Test(); ASSERT(!ret); - ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file - ASSERT(!ret); +// ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file +// ASSERT(!ret); ret = sml_ts3303_Test(); // this test case need config sml table name using ./sml_test config_file ASSERT(!ret);