From 63cc4e27b11d1f2992beccf8d28a0f094e5da2ae Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 9 Dec 2022 15:50:22 +0800 Subject: [PATCH 01/11] more code --- source/dnode/vnode/src/inc/vnodeInt.h | 3 + source/dnode/vnode/src/vnd/vnodeCfg.c | 3 - source/dnode/vnode/src/vnd/vnodeCommit.c | 4 +- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 158 ++++++++++++++++----- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 - 5 files changed, 123 insertions(+), 49 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8cf212cb1d..fb15af5fac 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -87,11 +87,14 @@ typedef struct SCommitInfo SCommitInfo; #define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA2_DIR "rsma2" +#define VND_INFO_FNAME "vnode.json" + // vnd.h void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void vnodeBufPoolFree(SVBufPool* pPool, void* p); void vnodeBufPoolRef(SVBufPool* pPool); void vnodeBufPoolUnRef(SVBufPool* pPool); +int vnodeDecodeInfo(uint8_t* pData, SVnodeInfo* pInfo); // meta typedef struct SMCtbCursor SMCtbCursor; diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 5adb2eb359..b5337a1c7b 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -135,9 +135,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { tjsonAddItemToArray(pNodeInfoArr, pNodeInfo); } - // add tsdb page size config - if (tjsonAddIntegerToObject(pJson, "tsdbPageSize", pCfg->tsdbPageSize) < 0) return -1; - return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index cd56468371..5860ef72fd 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -16,11 +16,9 @@ #include "vnd.h" #include "vnodeInt.h" -#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); -static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); static int vnodeCommitImpl(SCommitInfo *pInfo); int vnodeBegin(SVnode *pVnode) { @@ -407,7 +405,7 @@ _err: return -1; } -static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { +int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { SJson *pJson = NULL; pJson = tjsonParse(pData); diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index a34744a1da..742032a2a3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -21,6 +21,8 @@ struct SVSnapReader { int64_t sver; int64_t ever; int64_t index; + // config + int8_t cfgDone; // meta int8_t metaDone; SMetaSnapReader *pMetaReader; @@ -88,6 +90,53 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader) { int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) { int32_t code = 0; + // CONFIG ============== + // FIXME: if commit multiple times and the config changed? + if (!pReader->cfgDone) { + char fName[TSDB_FILENAME_LEN]; + if (pReader->pVnode->pTfs) { + snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s%s%s", tfsGetPrimaryPath(pReader->pVnode->pTfs), TD_DIRSEP, + pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME); + } else { + snprintf(fName, TSDB_FILENAME_LEN, "%s%s%s", pReader->pVnode->path, TD_DIRSEP, VND_INFO_FNAME); + } + + TdFilePtr pFile = taosOpenFile(fName, TD_FILE_READ); + if (NULL == pFile) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + int64_t size; + if (taosFStatFile(pFile, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFile); + goto _err; + } + + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size + 1); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + taosCloseFile(&pFile); + goto _err; + } + ((SSnapDataHdr *)(*ppData))->type = SNAP_DATA_CFG; + ((SSnapDataHdr *)(*ppData))->size = size + 1; + ((SSnapDataHdr *)(*ppData))->data[size] = '\0'; + + if (taosReadFile(pFile, ((SSnapDataHdr *)(*ppData))->data, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosMemoryFree(*ppData); + taosCloseFile(&pFile); + goto _err; + } + + taosCloseFile(&pFile); + + pReader->cfgDone = 1; + goto _exit; + } + // META ============== if (!pReader->metaDone) { // open reader if not @@ -230,6 +279,8 @@ struct SVSnapWriter { int64_t ever; int64_t commitID; int64_t index; + // config + SVnodeInfo info; // meta SMetaSnapWriter *pMetaSnapWriter; // tsdb @@ -248,6 +299,10 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr int32_t code = 0; SVSnapWriter *pWriter = NULL; + // commit memory data + vnodeAsyncCommit(pVnode); + tsem_wait(&pVnode->canCommit); + // alloc pWriter = (SVSnapWriter *)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { @@ -258,16 +313,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr pWriter->sver = sver; pWriter->ever = ever; - // commit it - code = vnodeSyncCommit(pVnode); - if (code) { - taosMemoryFree(pWriter); - goto _err; - } - // inc commit ID - pVnode->state.commitID++; - pWriter->commitID = pVnode->state.commitID; + pWriter->commitID = ++pVnode->state.commitID; vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode), sver, ever, pWriter->commitID); @@ -284,53 +331,82 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * int32_t code = 0; SVnode *pVnode = pWriter->pVnode; + if (!rollback) { + pVnode->config = pWriter->info.config; + pVnode->state = (SVState){.committed = pWriter->info.state.committed, + .applied = pWriter->info.state.committed, + .commitID = pWriter->commitID, + .commitTerm = pWriter->info.state.commitTerm, + .applyTerm = pWriter->info.state.commitTerm}; + pVnode->statis = pWriter->info.statis; + char dir[TSDB_FILENAME_LEN] = {0}; + if (pWriter->pVnode->pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path); + } + + vnodeCommitInfo(dir, &pWriter->info); + } else { + vnodeRollback(pWriter->pVnode); + } + if (pWriter->pMetaSnapWriter) { code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback); - if (code) goto _err; + if (code) goto _exit; } if (pWriter->pTsdbSnapWriter) { code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback); - if (code) goto _err; + if (code) goto _exit; } if (pWriter->pRsmaSnapWriter) { code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback); - if (code) goto _err; + if (code) goto _exit; } - if (!rollback) { - SVnodeInfo info = {0}; - char dir[TSDB_FILENAME_LEN]; + vnodeBegin(pVnode); - pVnode->state.committed = pWriter->ever; - pVnode->state.applied = pWriter->ever; - pVnode->state.applyTerm = pSnapshot->lastApplyTerm; - pVnode->state.commitTerm = pSnapshot->lastApplyTerm; - - info.config = pVnode->config; - info.state.committed = pVnode->state.applied; - info.state.commitTerm = pVnode->state.applyTerm; - info.state.commitID = pVnode->state.commitID; - snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); - code = vnodeSaveInfo(dir, &info); - if (code) goto _err; - - code = vnodeCommitInfo(dir, &info); - if (code) goto _err; - - vnodeBegin(pVnode); +_exit: + if (code) { + vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code)); } else { - ASSERT(0); + vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback); + taosMemoryFree(pWriter); + } + tsem_wait(&pVnode->canCommit); + return code; +} + +static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { + int32_t code = 0; + + SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; + + // decode info + if (vnodeDecodeInfo(pHdr->data, &pWriter->info) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + // change some value + pWriter->info.state.commitID = pWriter->commitID; + + // modify info as needed + char dir[TSDB_FILENAME_LEN] = {0}; + if (pWriter->pVnode->pTfs) { + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pWriter->pVnode->pTfs), TD_DIRSEP, + pWriter->pVnode->path); + } else { + snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path); + } + if (vnodeSaveInfo(dir, &pWriter->info) < 0) { + code = terrno; + goto _exit; } _exit: - vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback); - taosMemoryFree(pWriter); - return code; - -_err: - vError("vgId:%d, vnode snapshot writer close failed since %s", TD_VID(pWriter->pVnode), tstrerror(code)); return code; } @@ -347,6 +423,10 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { pHdr->type, nData); switch (pHdr->type) { + case SNAP_DATA_CFG: { + code = vnodeSnapWriteInfo(pWriter, pData, nData); + if (code) goto _err; + } break; case SNAP_DATA_META: { // meta if (pWriter->pMetaSnapWriter == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0fc42f3744..6092888136 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -317,11 +317,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp // commit if need if (vnodeShouldCommit(pVnode)) { vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); -#if 0 - vnodeSyncCommit(pVnode); -#else vnodeAsyncCommit(pVnode); -#endif // start a new one if (vnodeBegin(pVnode) < 0) { From 7b20c09f3454096919a59c8d3a0035c7b0937963 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 9 Dec 2022 16:19:49 +0800 Subject: [PATCH 02/11] more code --- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 35 +++++++++++++--------- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 7 +++++ 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index fb15af5fac..d6a9dc8557 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -241,6 +241,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); // STsdbSnapWriter ======================================== int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); // STqSnapshotReader == int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 8be4904349..c61ff343ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1376,27 +1376,34 @@ _exit: return code; } +int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) { + int32_t code = 0; + if (pWriter->dWriter.pWriter) { + code = tsdbSnapWriteCloseFile(pWriter); + if (code) goto _exit; + } + + code = tsdbSnapWriteDelEnd(pWriter); + if (code) goto _exit; + + code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs); + if (code) goto _exit; + +_exit: + if (code) { + tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); + } + return code; +} + int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; STsdbSnapWriter* pWriter = *ppWriter; STsdb* pTsdb = pWriter->pTsdb; if (rollback) { - ASSERT(0); - // code = tsdbFSRollback(pWriter->pTsdb->pFS); - // if (code) goto _err; + tsdbRollbackCommit(pWriter->pTsdb); } else { - if (pWriter->dWriter.pWriter) { - code = tsdbSnapWriteCloseFile(pWriter); - if (code) goto _err; - } - - code = tsdbSnapWriteDelEnd(pWriter); - if (code) goto _err; - - code = tsdbFSPrepareCommit(pWriter->pTsdb, &pWriter->fs); - if (code) goto _err; - // lock taosThreadRwlockWrlock(&pTsdb->rwLock); diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 742032a2a3..69c78e2d18 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -331,6 +331,12 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * int32_t code = 0; SVnode *pVnode = pWriter->pVnode; + // prepare + if (pWriter->pTsdbSnapWriter) { + tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter); + } + + // commit json if (!rollback) { pVnode->config = pWriter->info.config; pVnode->state = (SVState){.committed = pWriter->info.state.committed, @@ -351,6 +357,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * vnodeRollback(pWriter->pVnode); } + // commit/rollback sub-system if (pWriter->pMetaSnapWriter) { code = metaSnapWriterClose(&pWriter->pMetaSnapWriter, rollback); if (code) goto _exit; From 5f2b393c0cf3088b02b4ac80d61378e7c9ebccbf Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 9 Dec 2022 16:23:43 +0800 Subject: [PATCH 03/11] meta/snapshot: use nil heap and abort tdb when rollback --- source/dnode/vnode/src/inc/vnodeInt.h | 5 +++++ source/dnode/vnode/src/meta/metaCommit.c | 11 ++++++----- source/dnode/vnode/src/meta/metaSnapshot.c | 5 +++-- source/dnode/vnode/src/vnd/vnodeCommit.c | 2 +- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d6a9dc8557..a35b9c8a70 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -101,6 +101,10 @@ typedef struct SMCtbCursor SMCtbCursor; typedef struct SMStbCursor SMStbCursor; typedef struct STbUidStore STbUidStore; +#define META_BEGIN_HEAP_BUFFERPOOL 0 +#define META_BEGIN_HEAP_OS 1 +#define META_BEGIN_HEAP_NIL 2 + int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); int metaClose(SMeta* pMeta); int metaBegin(SMeta* pMeta, int8_t fromSys); @@ -108,6 +112,7 @@ TXN* metaGetTxn(SMeta* pMeta); int metaCommit(SMeta* pMeta, TXN* txn); int metaFinishCommit(SMeta* pMeta, TXN* txn); int metaPrepareAsyncCommit(SMeta* pMeta); +int metaAbort(SMeta* pMeta); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 5eb27679bb..de3abc2c8e 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -19,19 +19,20 @@ static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) { return vnodeBuf static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); } // begin a meta txn -int metaBegin(SMeta *pMeta, int8_t fromSys) { - void *(*xMalloc)(void *, size_t); - void (*xFree)(void *, void *); +int metaBegin(SMeta *pMeta, int8_t heap) { + void *(*xMalloc)(void *, size_t) = NULL; + void (*xFree)(void *, void *) = NULL; void *xArg = NULL; - if (fromSys) { + if (heap == META_BEGIN_HEAP_OS) { xMalloc = tdbDefaultMalloc; xFree = tdbDefaultFree; - } else { + } else if (heap == META_BEGIN_HEAP_BUFFERPOOL) { xMalloc = metaMalloc; xFree = metaFree; xArg = pMeta->pVnode->inUse; } + if (tdbBegin(pMeta->pEnv, &pMeta->txn, xMalloc, xFree, xArg, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index 6a4dcf6ead..974f8a9218 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -145,7 +145,7 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr pWriter->sver = sver; pWriter->ever = ever; - metaBegin(pMeta, 1); + metaBegin(pMeta, META_BEGIN_HEAP_NIL); *ppWriter = pWriter; return code; @@ -161,7 +161,8 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) { SMetaSnapWriter* pWriter = *ppWriter; if (rollback) { - ASSERT(0); + code = metaAbort(pWriter->pMeta); + if (code) goto _err; } else { code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn); if (code) goto _err; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 5860ef72fd..4daab074b5 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -38,7 +38,7 @@ int vnodeBegin(SVnode *pVnode) { pVnode->state.commitID++; // begin meta - if (metaBegin(pVnode->pMeta, 0) < 0) { + if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) { vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } From 66fba7ec601955b22e4181562352b16b541de5ff Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 9 Dec 2022 16:28:26 +0800 Subject: [PATCH 04/11] meta/begin: a comment to describe the default heap option --- source/dnode/vnode/src/meta/metaCommit.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index de3abc2c8e..ac8d99ccf0 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -24,6 +24,7 @@ int metaBegin(SMeta *pMeta, int8_t heap) { void (*xFree)(void *, void *) = NULL; void *xArg = NULL; + // default heap to META_BEGIN_HEAP_NIL if (heap == META_BEGIN_HEAP_OS) { xMalloc = tdbDefaultMalloc; xFree = tdbDefaultFree; From 8bc39b7c90666fdb1767c1de1be690daa6f21c23 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 9 Dec 2022 17:14:00 +0800 Subject: [PATCH 05/11] fix sem wait --- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 69c78e2d18..40705e553b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -382,7 +382,7 @@ _exit: vInfo("vgId:%d, vnode snapshot writer closed, rollback:%d", TD_VID(pVnode), rollback); taosMemoryFree(pWriter); } - tsem_wait(&pVnode->canCommit); + tsem_post(&pVnode->canCommit); return code; } From 49bcb15fab2178e9b466c351fe8b725f06478f8d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 14 Dec 2022 16:34:56 +0800 Subject: [PATCH 06/11] enh: ignore head line in cvs file --- source/libs/parser/src/parInsertSql.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 36420599b3..a6bead89aa 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1411,6 +1411,7 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, (*pNumOfRows) = 0; char* pLine = NULL; int64_t readLen = 0; + bool firstLine = (pStmt->fileProcessing == false); pStmt->fileProcessing = false; while (TSDB_CODE_SUCCESS == code && (readLen = taosGetLineFile(pStmt->fp, &pLine)) != -1) { if (('\r' == pLine[readLen - 1]) || ('\n' == pLine[readLen - 1])) { @@ -1418,6 +1419,7 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, } if (readLen == 0) { + firstLine = false; continue; } @@ -1431,6 +1433,11 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, strtolower(pLine, pLine); const char* pRow = pLine; code = parseOneRow(pCxt, (const char**)&pRow, pDataBuf, &gotRow, &token); + if (code && firstLine) { + firstLine = false; + code = 0; + continue; + } } if (TSDB_CODE_SUCCESS == code && gotRow) { @@ -1442,6 +1449,8 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, pStmt->fileProcessing = true; break; } + + firstLine = false; } taosMemoryFree(pLine); From 8aa20b531078e88bc9dd9b3141cad0bb2690eb36 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 17 Dec 2022 17:30:43 +0800 Subject: [PATCH 07/11] docs(stream) --- docs/en/12-taos-sql/14-stream.md | 30 +++++++++++++++++++++++++++++- docs/zh/12-taos-sql/14-stream.md | 29 +++++++++++++++++++++++++---- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index 17e4e4d1b0..8c81dcaeef 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -10,7 +10,7 @@ Because stream processing is built in to TDengine, you are no longer reliant on ## Create a Stream ```sql -CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery +CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name SUBTABLE(expression) AS subquery stream_options: { TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time] WATERMARK time @@ -30,6 +30,8 @@ subquery: SELECT [DISTINCT] select_list Session windows, state windows, and sliding windows are supported. When you configure a session or state window for a supertable, you must use PARTITION BY TBNAME. +Subtable Clause defines the naming rules of auto-created subtable, you can see more details in below part: Partitions of Stream. + ```sql window_clause: { SESSION(ts_col, tol_val) @@ -47,6 +49,32 @@ CREATE STREAM avg_vol_s INTO avg_vol AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s); ``` +## Partitions of Stream + +A Stream can process data in multiple partitions. Partition rules can be defined by PARTITION BY clause in stream processing. Each partition will have different timelines and windows, and will be processed separately and be written into different subtables of target supertable. + +If a stream is created without PARTITION BY clause, all data will be written into one subtable. + +If a stream is created with PARTITION BY clause without SUBTABLE clause, each partition will be given a random name. + +If a stream is created with PARTITION BY clause and SUBTABLE clause, the name of each partition will be calculated according to SUBTABLE clause. For example: + +```sql +CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m); +``` + +IN PARTITION clause, 'tbname', representing each subtable name of source supertable, is given alias 'tname'. And 'tname' is used in SUBTABLE clause. In SUBTABLE clause, each auto created subtable will concat 'new-' and source subtable name as their name. Other expressions are also allowed in SUBTABLE clause, but the output type must be varchar. + +If the output length exceeds the limitation of TDengine(192), the name will be truncated. If the generated name is occupied by some other table, the creation and writing of the new subtable will be failed. + +## Filling history data + +Normally a stream does not process data already or being written into source table when it's being creating. But adding FILL_HISTORY 1 as a stream option when creating the stream will allow it to process data written before and while creating the stream. For example: + +```sql +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 interval(10s) +``` + ## Delete a Stream ```sql diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 932ad30b1a..1e6be13639 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -8,7 +8,7 @@ description: 流式计算的相关 SQL 的详细语法 ## 创建流式计算 ```sql -CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery +CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name SUBTABLE(expression) AS subquery stream_options: { TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time] WATERMARK time @@ -28,6 +28,9 @@ subquery: SELECT select_list 支持会话窗口、状态窗口与滑动窗口,其中,会话窗口与状态窗口搭配超级表时必须与partition by tbname一起使用 + +subtable 子句定义了流式计算中创建的子表的命名规则,详见 流式计算的 partition 部分。 + ```sql window_clause: { SESSION(ts_col, tol_val) @@ -49,11 +52,29 @@ SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL( ## 流式计算的 partition -可以使用 PARTITION BY TBNAME 或 PARTITION BY tag,对一个流进行多分区的计算,每个分区的时间线与时间窗口是独立的,会各自聚合,并写入到目的表中的不同子表。 +可以使用 PARTITION BY TBNAME,tag,普通列或者表达式,对一个流进行多分区的计算,每个分区的时间线与时间窗口是独立的,会各自聚合,并写入到目的表中的不同子表。 -不带 PARTITION BY 选项时,所有的数据将写入到一张子表。 +不带 PARTITION BY 子句时,所有的数据将写入到一张子表。 -流式计算创建的超级表有唯一的 tag 列 groupId,每个 partition 会被分配唯一 groupId。与 schemaless 写入一致,我们通过 MD5 计算子表名,并自动创建它。 +在创建流时不使用 SUBTABLE 子句时,流式计算创建的超级表有唯一的 tag 列 groupId,每个 partition 会被分配唯一 groupId。与 schemaless 写入一致,我们通过 MD5 计算子表名,并自动创建它。 + +若创建流的语句中包含 SUBTABLE 子句,用户可以为每个 partition 对应的子表生成自定义的表名,例如: + +```sql +CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m); +``` + +PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子句中的别名可以用于 SUBTABLE 子句中的表达式计算,在上述示例中,流新创建的子表将以前缀 'new-' 连接原表名作为表名。 + +注意,子表名的长度若超过 TDengine 的限制,将被截断。若要生成的子表名已经存在于另一超级表,由于 TDengine 的子表名是唯一的,因此对应新子表的创建以及数据的写入将会失败。 + +## 流式计算读取历史数据 + +正常情况下,流式计算不会处理创建前已经写入源表中的数据,若要处理已经写入的数据,可以在创建流时设置 fill_history 1 选项,这样创建的流式计算会自动处理创建前、创建中、创建后写入的数据。例如: + +```sql +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 interval(10s) +``` ## 删除流式计算 From 191c43654dfe0575173b0dd88bc1159c56aefc1c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 17 Dec 2022 18:08:01 +0800 Subject: [PATCH 08/11] docs(stream) (#18988) --- docs/en/12-taos-sql/14-stream.md | 30 +++++++++++++++++++++++++++++- docs/zh/12-taos-sql/14-stream.md | 29 +++++++++++++++++++++++++---- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index 17e4e4d1b0..8c81dcaeef 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -10,7 +10,7 @@ Because stream processing is built in to TDengine, you are no longer reliant on ## Create a Stream ```sql -CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery +CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name SUBTABLE(expression) AS subquery stream_options: { TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time] WATERMARK time @@ -30,6 +30,8 @@ subquery: SELECT [DISTINCT] select_list Session windows, state windows, and sliding windows are supported. When you configure a session or state window for a supertable, you must use PARTITION BY TBNAME. +Subtable Clause defines the naming rules of auto-created subtable, you can see more details in below part: Partitions of Stream. + ```sql window_clause: { SESSION(ts_col, tol_val) @@ -47,6 +49,32 @@ CREATE STREAM avg_vol_s INTO avg_vol AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s); ``` +## Partitions of Stream + +A Stream can process data in multiple partitions. Partition rules can be defined by PARTITION BY clause in stream processing. Each partition will have different timelines and windows, and will be processed separately and be written into different subtables of target supertable. + +If a stream is created without PARTITION BY clause, all data will be written into one subtable. + +If a stream is created with PARTITION BY clause without SUBTABLE clause, each partition will be given a random name. + +If a stream is created with PARTITION BY clause and SUBTABLE clause, the name of each partition will be calculated according to SUBTABLE clause. For example: + +```sql +CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m); +``` + +IN PARTITION clause, 'tbname', representing each subtable name of source supertable, is given alias 'tname'. And 'tname' is used in SUBTABLE clause. In SUBTABLE clause, each auto created subtable will concat 'new-' and source subtable name as their name. Other expressions are also allowed in SUBTABLE clause, but the output type must be varchar. + +If the output length exceeds the limitation of TDengine(192), the name will be truncated. If the generated name is occupied by some other table, the creation and writing of the new subtable will be failed. + +## Filling history data + +Normally a stream does not process data already or being written into source table when it's being creating. But adding FILL_HISTORY 1 as a stream option when creating the stream will allow it to process data written before and while creating the stream. For example: + +```sql +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 interval(10s) +``` + ## Delete a Stream ```sql diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 932ad30b1a..1e6be13639 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -8,7 +8,7 @@ description: 流式计算的相关 SQL 的详细语法 ## 创建流式计算 ```sql -CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery +CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name SUBTABLE(expression) AS subquery stream_options: { TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time] WATERMARK time @@ -28,6 +28,9 @@ subquery: SELECT select_list 支持会话窗口、状态窗口与滑动窗口,其中,会话窗口与状态窗口搭配超级表时必须与partition by tbname一起使用 + +subtable 子句定义了流式计算中创建的子表的命名规则,详见 流式计算的 partition 部分。 + ```sql window_clause: { SESSION(ts_col, tol_val) @@ -49,11 +52,29 @@ SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL( ## 流式计算的 partition -可以使用 PARTITION BY TBNAME 或 PARTITION BY tag,对一个流进行多分区的计算,每个分区的时间线与时间窗口是独立的,会各自聚合,并写入到目的表中的不同子表。 +可以使用 PARTITION BY TBNAME,tag,普通列或者表达式,对一个流进行多分区的计算,每个分区的时间线与时间窗口是独立的,会各自聚合,并写入到目的表中的不同子表。 -不带 PARTITION BY 选项时,所有的数据将写入到一张子表。 +不带 PARTITION BY 子句时,所有的数据将写入到一张子表。 -流式计算创建的超级表有唯一的 tag 列 groupId,每个 partition 会被分配唯一 groupId。与 schemaless 写入一致,我们通过 MD5 计算子表名,并自动创建它。 +在创建流时不使用 SUBTABLE 子句时,流式计算创建的超级表有唯一的 tag 列 groupId,每个 partition 会被分配唯一 groupId。与 schemaless 写入一致,我们通过 MD5 计算子表名,并自动创建它。 + +若创建流的语句中包含 SUBTABLE 子句,用户可以为每个 partition 对应的子表生成自定义的表名,例如: + +```sql +CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m); +``` + +PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子句中的别名可以用于 SUBTABLE 子句中的表达式计算,在上述示例中,流新创建的子表将以前缀 'new-' 连接原表名作为表名。 + +注意,子表名的长度若超过 TDengine 的限制,将被截断。若要生成的子表名已经存在于另一超级表,由于 TDengine 的子表名是唯一的,因此对应新子表的创建以及数据的写入将会失败。 + +## 流式计算读取历史数据 + +正常情况下,流式计算不会处理创建前已经写入源表中的数据,若要处理已经写入的数据,可以在创建流时设置 fill_history 1 选项,这样创建的流式计算会自动处理创建前、创建中、创建后写入的数据。例如: + +```sql +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 interval(10s) +``` ## 删除流式计算 From 726951fef05bfe737353b53faf34996018e27938 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 17 Dec 2022 18:27:23 +0800 Subject: [PATCH 09/11] docs(stream) --- docs/en/12-taos-sql/14-stream.md | 14 ++++++++++++++ docs/zh/12-taos-sql/14-stream.md | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index 8c81dcaeef..bd1858d93f 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -75,6 +75,20 @@ Normally a stream does not process data already or being written into source tab create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 interval(10s) ``` +Combining fill_history option and where clause, stream can processing data of specific time range. For example, only process data after a past time. (In this case, 2020-01-30) + +```sql +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > timestamp '2020-01-30' interval(10s) +``` + +As another example, only processing data starting from some past time, and ending at some future time. + +```sql +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > timestamp '2020-01-30' and ts < timestamp '2023-01-01 '> interval(10s) +``` + +If some streams are totally outdated, and you do not want it to monitor or process anymore, those streams can be manually dropped and output data will be still kept. + ## Delete a Stream ```sql diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 1e6be13639..6c18c41f0c 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -76,6 +76,20 @@ PARTITION 子句中,为 tbname 定义了一个别名 tname, 在PARTITION 子 create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 interval(10s) ``` +结合 fill_history 1 选项,可以实现只处理特定历史时间范围的数据,例如:只处理某历史时刻(2020年1月30日)之后的数据 + +```sql +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > timestamp '2020-01-30' interval(10s) +``` + +再如,仅处理某时间段内的数据,结束时间可以是未来时间 + +```sql +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > timestamp '2020-01-30' and ts < timestamp '2023-01-01 '> interval(10s) +``` + +如果该流任务已经彻底过期,并且您不再想让它检测或处理数据,您可以手动删除它,被计算出的数据仍会被保留。 + ## 删除流式计算 ```sql From ead5d8edf21317228cbdab4e18942c269759e2eb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 17 Dec 2022 18:34:29 +0800 Subject: [PATCH 10/11] fix: reduce drop dnode speed makes data more secure --- include/common/tmsg.h | 2 +- source/dnode/mnode/impl/src/mndVgroup.c | 10 ++++++++- source/libs/sync/src/syncMain.c | 2 +- source/libs/sync/src/syncRespMgr.c | 30 ++++++++++++++++++------- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb1addf1b6..ad6077db09 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -68,7 +68,7 @@ typedef uint16_t tmsg_t; static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || - (type == TDMT_VND_UPDATE_TAG_VAL); + (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); } static inline bool syncUtilUserCommit(tmsg_t msgType) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 4a6f0d14da..31ab1f3259 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1126,8 +1126,12 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, } if (!force) { +#if 1 + { +#else if (newVg.replica == 1) { - mInfo("vgId:%d, will add 1 vnode, replca:1", pVgroup->vgId); +#endif + mInfo("vgId:%d, will add 1 vnode, replca:%d", pVgroup->vgId, newVg.replica); if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; for (int32_t i = 0; i < newVg.replica - 1; ++i) { if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; @@ -1155,6 +1159,9 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1; } if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; +#if 1 + } +#else } else { // new replica == 3 mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId); if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; @@ -1181,6 +1188,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1; } +#endif } else { mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId); if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d6ce77193a..6a545424fc 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -233,7 +233,7 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { rpcSendResponse(&rpcMsg); return 0; } else { - sInfo("no rpcinfo to send timeout response, seq:%" PRId64, seq); + sError("no message handle to send timeout response, seq:%" PRId64, seq); return -1; } } diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 049b02d73e..79a38cad7a 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -35,11 +35,16 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { pObj->seqNum = 0; taosThreadMutexInit(&(pObj->mutex), NULL); + SSyncNode *pNode = pObj->data; + sTrace("vgId:%d, create resp manager", pNode->vgId); return pObj; } void syncRespMgrDestroy(SSyncRespMgr *pObj) { if (pObj != NULL) { + SSyncNode *pNode = pObj->data; + sTrace("vgId:%d, destroy resp manager", pNode->vgId); + taosThreadMutexLock(&pObj->mutex); taosHashCleanup(pObj->pRespHash); taosThreadMutexUnlock(&pObj->mutex); @@ -81,6 +86,8 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) { taosThreadMutexUnlock(&pObj->mutex); return 1; // get one object + } else { + sNError(pObj->data, "get message handle, no object of seq:%" PRIu64, seq); } taosThreadMutexUnlock(&pObj->mutex); @@ -99,6 +106,8 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p taosThreadMutexUnlock(&pObj->mutex); return 1; // get one object + } else { + sNError(pObj->data, "get-and-del message handle, no object of seq:%" PRIu64, seq); } taosThreadMutexUnlock(&pObj->mutex); @@ -114,7 +123,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t)); if (delIndexArray == NULL) return; - sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId); + sDebug("vgId:%d, resp manager begin clean by ttl", pSyncNode->vgId); while (pStub) { size_t len; void *key = taosHashGetKey(pStub, &len); @@ -143,34 +152,39 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { // TODO: and make rpcMsg body, call commit cb // pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta); - - pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER; - if (pStub->rpcMsg.info.handle != NULL) { - tmsgSendRsp(&pStub->rpcMsg); - } + SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT}; + sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pSyncNode->vgId, rpcMsg.info.handle, + TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle); + rpcSendResponse(&rpcMsg); } pStub = taosHashIterate(pObj->pRespHash, pStub); } int32_t arraySize = taosArrayGetSize(delIndexArray); - sDebug("vgId:%d, resp mgr end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize); + sDebug("vgId:%d, resp manager end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize); for (int32_t i = 0; i < arraySize; ++i) { uint64_t *pSeqNum = taosArrayGet(delIndexArray, i); taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t)); - sDebug("vgId:%d, resp mgr clean by ttl, seq:%" PRId64 "", pSyncNode->vgId, *pSeqNum); + sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pSyncNode->vgId, *pSeqNum); } taosArrayDestroy(delIndexArray); } void syncRespCleanRsp(SSyncRespMgr *pObj) { + SSyncNode *pNode = pObj->data; + sTrace("vgId:%d, clean all rsp", pNode->vgId); + taosThreadMutexLock(&pObj->mutex); syncRespCleanByTTL(pObj, -1, true); taosThreadMutexUnlock(&pObj->mutex); } void syncRespClean(SSyncRespMgr *pObj) { + SSyncNode *pNode = pObj->data; + sTrace("vgId:%d, clean rsp by ttl", pNode->vgId); + taosThreadMutexLock(&pObj->mutex); syncRespCleanByTTL(pObj, pObj->ttl, false); taosThreadMutexUnlock(&pObj->mutex); From ab9c07c4200d29d16af765974562652ea88df8f7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Sat, 17 Dec 2022 19:11:43 +0800 Subject: [PATCH 11/11] docs(stream) --- docs/en/12-taos-sql/14-stream.md | 4 ++-- docs/zh/12-taos-sql/14-stream.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index 07db5f4877..c47d2da0eb 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -78,13 +78,13 @@ create stream if not exists s1 fill_history 1 into st1 as select count(*) from Combining fill_history option and where clause, stream can processing data of specific time range. For example, only process data after a past time. (In this case, 2020-01-30) ```sql -create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > timestamp '2020-01-30' interval(10s) +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > '2020-01-30' interval(10s) ``` As another example, only processing data starting from some past time, and ending at some future time. ```sql -create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > timestamp '2020-01-30' and ts < timestamp '2023-01-01 '> interval(10s) +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > '2020-01-30' and ts < '2023-01-01' interval(10s) ``` If some streams are totally outdated, and you do not want it to monitor or process anymore, those streams can be manually dropped and output data will be still kept. diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 6c18c41f0c..a70d559a86 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -79,13 +79,13 @@ create stream if not exists s1 fill_history 1 into st1 as select count(*) from 结合 fill_history 1 选项,可以实现只处理特定历史时间范围的数据,例如:只处理某历史时刻(2020年1月30日)之后的数据 ```sql -create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > timestamp '2020-01-30' interval(10s) +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > '2020-01-30' interval(10s) ``` 再如,仅处理某时间段内的数据,结束时间可以是未来时间 ```sql -create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > timestamp '2020-01-30' and ts < timestamp '2023-01-01 '> interval(10s) +create stream if not exists s1 fill_history 1 into st1 as select count(*) from t1 where ts > '2020-01-30' and ts < '2023-01-01' interval(10s) ``` 如果该流任务已经彻底过期,并且您不再想让它检测或处理数据,您可以手动删除它,被计算出的数据仍会被保留。