From b913c0b94b211df032ae90e43c6c31ca58e4eb48 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 19 Aug 2024 16:51:39 +0800 Subject: [PATCH 1/4] fix: async fetch rows no callback issue --- source/client/src/clientImpl.c | 2 +- source/libs/scheduler/src/schJob.c | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 664de5619f..a8f6239906 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2928,7 +2928,7 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req); if (TSDB_CODE_SUCCESS != code) { tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId); - pRequest->body.fetchFp(param, pRequest, code); + //pRequest->body.fetchFp(param, pRequest, code); } } diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 58a0706223..9c9cce7516 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -519,7 +519,7 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) { goto _return; } - if (op && pJob->opStatus.op != op) { + if (SCH_OP_NULL != op && pJob->opStatus.op != op) { SCH_JOB_ELOG("job in operation %s mis-match with expected %s", schGetOpStr(pJob->opStatus.op), schGetOpStr(op)); goto _return; } @@ -547,9 +547,10 @@ _return: int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { + schPostJobRes(pJob, 0); return TSDB_CODE_SCH_IGNORE_ERROR; } - + schUpdateJobErrCode(pJob, errCode); int32_t code = atomic_load_32(&pJob->errCode); From 720ab674312e0811d32c724b390b9300f91311a0 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 22 Aug 2024 17:28:05 +0800 Subject: [PATCH 2/4] enh: clear useless asserts --- source/common/src/tdataformat.c | 10 +++--- source/dnode/vnode/src/tsdb/tsdbSnapInfo.c | 27 +++++---------- source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c | 11 ------ source/dnode/vnode/src/tsdb/tsdbUtil.c | 34 +++++++++---------- source/dnode/vnode/src/vnd/vnodeCfg.c | 1 - source/dnode/vnode/src/vnd/vnodeSnapshot.c | 15 ++++---- source/dnode/vnode/src/vnd/vnodeSvr.c | 14 ++++++-- source/dnode/vnode/src/vnd/vnodeSync.c | 12 ++++--- source/util/src/tlrucache.c | 29 ---------------- 9 files changed, 56 insertions(+), 97 deletions(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index f726a5430e..1df68b5389 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -1403,7 +1403,6 @@ void tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) { pVal->val = pSrc->pks[i].val; } else { pVal->nData = pSrc->pks[i].nData; - ASSERT(pSrc->pks[i].pData != NULL); (void)memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData); } } @@ -2517,7 +2516,7 @@ static FORCE_INLINE int32_t tColDataUpdateValue70(SColData *pColData, uint8_t *p return tColDataPutValue(pColData, pData, nData); } } else { - ASSERT(0); + return TSDB_CODE_INVALID_PARA; } return 0; } @@ -2760,7 +2759,9 @@ int32_t tColDataCompress(SColData *colData, SColDataCompressInfo *info, SBuffer int32_t code; SBuffer local; - ASSERT(colData->nVal > 0); + if (!(colData->nVal > 0)) { + return TSDB_CODE_INVALID_PARA; + } (*info) = (SColDataCompressInfo){ .cmprAlg = info->cmprAlg, @@ -3217,7 +3218,6 @@ void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SR for (int i = 1; i < nColData; i++) { if (aColData[i].cflag & COL_IS_KEY) { - ASSERT(aColData->flag == HAS_VALUE); tColDataGetValue4(&aColData[i], iRow, &cv); key->pks[key->numOfPKs++] = cv.value; } else { @@ -3379,8 +3379,6 @@ static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd / iv < (pColData->nVal - 1) ? pColData->aOffset[iv + 1] - pColData->aOffset[iv] : pColData->nData - pColData->aOffset[iv]); } - // TODO - ASSERT(0); } else { if (iv != iStart) { (void)memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart], diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c b/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c index decf276bc6..4a73f53c77 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c @@ -92,7 +92,6 @@ static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition** for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { if (fset->farr[ftype] == NULL) continue; typ = tsdbFTypeToFRangeType(ftype); - ASSERT(typ < TSDB_FSET_RANGE_TYP_MAX); STFile* f = fset->farr[ftype]->f; if (f->maxVer > fset->maxVerValid) { corrupt = true; @@ -103,8 +102,7 @@ static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition** } count++; SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer}; - code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); - ASSERT(code == 0); + (void)TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); } typ = TSDB_FSET_RANGE_TYP_STT; @@ -122,14 +120,12 @@ static int32_t tsdbTFileSetToFSetPartition(STFileSet* fset, STsdbFSetPartition** } count++; SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer}; - code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); - ASSERT(code == 0); + (void)TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); } } if (corrupt && count == 0) { SVersionRange vr = {.minVer = VERSION_MIN, .maxVer = fset->maxVerValid}; - code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); - ASSERT(code == 0); + (void)TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); } ppSP[0] = p; return 0; @@ -186,8 +182,7 @@ int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList* pList, TFileSetRangeArray r->sver = maxVerValid + 1; r->ever = VERSION_MAX; tsdbDebug("range diff fid:%" PRId64 ", sver:%" PRId64 ", ever:%" PRId64, part->fid, r->sver, r->ever); - int32_t code = TARRAY2_SORT_INSERT(pDiff, r, tsdbTFileSetRangeCmprFn); - ASSERT(code == 0); + (void)TARRAY2_SORT_INSERT(pDiff, r, tsdbTFileSetRangeCmprFn); } ppRanges[0] = pDiff; @@ -360,9 +355,7 @@ static STsdbFSetPartList* tsdbSnapGetFSetPartList(STFileSystem* fs) { terrno = code; break; } - ASSERT(pItem != NULL); - code = TARRAY2_SORT_INSERT(pList, pItem, tsdbFSetPartCmprFn); - ASSERT(code == 0); + (void)TARRAY2_SORT_INSERT(pList, pItem, tsdbFSetPartCmprFn); } (void)taosThreadMutexUnlock(&fs->tsdb->mutex); @@ -400,7 +393,9 @@ static int32_t tsdbPartitionInfoInit(SVnode* pVnode, STsdbPartitionInfo* pInfo) pInfo->vgId = TD_VID(pVnode); pInfo->tsdbMaxCnt = (!VND_IS_RSMA(pVnode) ? 1 : TSDB_RETENTION_MAX); - ASSERT(sizeof(pInfo->subTyps) == sizeof(subTyps)); + if (!(sizeof(pInfo->subTyps) == sizeof(subTyps))) { + return TSDB_CODE_INVALID_PARA; + } memcpy(pInfo->subTyps, (char*)subTyps, sizeof(subTyps)); // fset partition list @@ -437,8 +432,7 @@ static int32_t tsdbPartitionInfoSerialize(STsdbPartitionInfo* pInfo, uint8_t* bu for (int32_t j = 0; j < pInfo->tsdbMaxCnt; ++j) { SSyncTLV* pSubHead = (void*)((char*)buf + offset); int32_t valOffset = offset + sizeof(*pSubHead); - ASSERT(pSubHead->val == (char*)buf + valOffset); - int32_t code = tSerializeTsdbFSetPartList(pSubHead->val, bufLen - valOffset, pInfo->pLists[j], &tlen); + int32_t code = tSerializeTsdbFSetPartList(pSubHead->val, bufLen - valOffset, pInfo->pLists[j], &tlen); if (code) { tsdbError("vgId:%d, failed to serialize fset partition list of tsdb %d since %s", pInfo->vgId, j, terrstr()); return code; @@ -574,7 +568,6 @@ static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, ST } int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { - ASSERT(pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY); STsdbPartitionInfo partitionInfo = {0}; int code = 0; STsdbPartitionInfo* pInfo = &partitionInfo; @@ -616,7 +609,6 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { goto _out; } offset += tlen; - ASSERT(offset <= bufLen); if ((tlen = tsdbRepOptsSerialize(&opts, buf + offset, bufLen - offset)) < 0) { code = tlen; @@ -624,7 +616,6 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { goto _out; } offset += tlen; - ASSERT(offset <= bufLen); // set header of info data SSyncTLV* pHead = pSnap->data; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c index 5b69638b36..55ddf64421 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshotRAW.c @@ -170,11 +170,8 @@ static int64_t tsdbSnapRAWReadPeek(SDataFileRAWReader* reader) { } static SDataFileRAWReader* tsdbSnapRAWReaderIterNext(STsdbSnapRAWReader* reader) { - ASSERT(reader->dataIter->idx <= reader->dataIter->count); - while (reader->dataIter->idx < reader->dataIter->count) { SDataFileRAWReader* dataReader = TARRAY2_GET(reader->dataReaderArr, reader->dataIter->idx); - ASSERT(dataReader); if (dataReader->ctx->offset < dataReader->config->file.size) { return dataReader; } @@ -196,7 +193,6 @@ static int32_t tsdbSnapRAWReadNext(STsdbSnapRAWReader* reader, SSnapDataHdr** pp // prepare int64_t dataLength = tsdbSnapRAWReadPeek(dataReader); - ASSERT(dataLength > 0); void* pBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + sizeof(STsdbDataRAWBlockHeader) + dataLength); if (pBuf == NULL) { @@ -217,7 +213,6 @@ static int32_t tsdbSnapRAWReadNext(STsdbSnapRAWReader* reader, SSnapDataHdr** pp // finish dataReader->ctx->offset += pBlock->dataLength; - ASSERT(dataReader->ctx->offset <= dataReader->config->file.size); ppData[0] = pBuf; _exit: @@ -247,8 +242,6 @@ static int32_t tsdbSnapRAWReadBegin(STsdbSnapRAWReader* reader) { int32_t code = 0; int32_t lino = 0; - ASSERT(reader->ctx->fset == NULL); - if (reader->ctx->fsetArrIdx < TARRAY2_SIZE(reader->fsetArr)) { reader->ctx->fset = TARRAY2_GET(reader->fsetArr, reader->ctx->fsetArrIdx++); reader->ctx->isDataDone = false; @@ -409,8 +402,6 @@ static int32_t tsdbSnapRAWWriteFileSetBegin(STsdbSnapRAWWriter* writer, int32_t int32_t code = 0; int32_t lino = 0; - ASSERT(writer->ctx->fsetWriteBegin == false); - STFileSet* fset = &(STFileSet){.fid = fid}; writer->ctx->fid = fid; @@ -555,8 +546,6 @@ _exit: } int32_t tsdbSnapRAWWrite(STsdbSnapRAWWriter* writer, SSnapDataHdr* hdr) { - ASSERT(hdr->type == SNAP_DATA_RAW); - int32_t code = 0; int32_t lino = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index f70b0aad26..8820a026e9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -403,8 +403,6 @@ static const int32_t BLOCK_WITH_ALG_VER = 2; int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) { int32_t code; - ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); - if ((code = tBufferPutI16v(buffer, pBlockCol->cid))) return code; if ((code = tBufferPutI8(buffer, pBlockCol->type))) return code; if ((code = tBufferPutI8(buffer, pBlockCol->cflag))) return code; @@ -443,8 +441,6 @@ int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint3 if ((code = tBufferGetI8(br, &pBlockCol->flag))) return code; if ((code = tBufferGetI32v(br, &pBlockCol->szOrigin))) return code; - ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); - pBlockCol->szBitmap = 0; pBlockCol->szOffset = 0; pBlockCol->szValue = 0; @@ -647,7 +643,6 @@ void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key) { if (pColData->cflag & COL_IS_KEY) { SColVal cv; tColDataGetValue(pColData, irow, &cv); - ASSERT(COL_VAL_IS_VALUE(&cv)); key->pks[key->numOfPKs] = cv.value; key->numOfPKs++; } else { @@ -748,8 +743,6 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) jCol = 0; pTColumn = &pTSchema->columns[jCol++]; - ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); - *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = key.ts})); if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { code = terrno; @@ -800,8 +793,6 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) pMerger->version = key.version; return 0; } else { - ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts); - for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) { pTColumn = &pMerger->pTSchema->columns[iCol]; if (pTSchema->columns[jCol].colId < pTColumn->colId) { @@ -852,7 +843,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) } } } else { - ASSERT(0 && "dup versions not allowed"); + return TSDB_CODE_INVALID_PARA; } } @@ -1164,7 +1155,9 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, while (pTColumn->colId < aCid[iCid]) { iColumn++; - ASSERT(iColumn < pTSchema->numOfCols); + if (!(iColumn < pTSchema->numOfCols)) { + return TSDB_CODE_INVALID_PARA; + } pTColumn = &pTSchema->columns[iColumn]; } @@ -1272,7 +1265,9 @@ _exit: int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { int32_t code = 0; - ASSERT(pBlockData->suid || pBlockData->uid); + if (!(pBlockData->suid || pBlockData->uid)) { + return TSDB_CODE_INVALID_PARA; + } // uid if (pBlockData->uid == 0) { @@ -1299,7 +1294,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, 0 /* append */); if (code) goto _exit; } else { - ASSERT(0); + return TSDB_CODE_INVALID_PARA; } pBlockData->nRow++; @@ -1357,7 +1352,6 @@ int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS #endif SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) { - ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID); int32_t lidx = 0; int32_t ridx = pBlockData->nColData - 1; @@ -1627,7 +1621,9 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S // primary keys for (hdr->numOfPKs = 0; hdr->numOfPKs < bData->nColData; hdr->numOfPKs++) { - ASSERT(hdr->numOfPKs <= TD_MAX_PK_COLS); + if (!(hdr->numOfPKs <= TD_MAX_PK_COLS)) { + return TSDB_CODE_INVALID_PARA; + } SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs]; SColData *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs); @@ -1762,8 +1758,12 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, for (int i = 0; i < hdr->numOfPKs; i++) { const SBlockCol *blockCol = &hdr->primaryBlockCols[i]; - ASSERT(blockCol->flag == HAS_VALUE); - ASSERT(blockCol->cflag & COL_IS_KEY); + if (!(blockCol->flag == HAS_VALUE)) { + TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit); + } + if (!(blockCol->cflag & COL_IS_KEY)) { + TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit); + } code = tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index e2791d8a00..d90badc34c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -248,7 +248,6 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { } for (int32_t i = 0; i < nRetention; ++i) { SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i); - ASSERT(pNodeRetention != NULL); tjsonGetNumberValue(pNodeRetention, "freq", (pCfg->tsdbCfg.retentions)[i].freq, code); if (code) return code; tjsonGetNumberValue(pNodeRetention, "freqUnit", (pCfg->tsdbCfg.retentions)[i].freqUnit, code); diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 28e7ae97ca..9a5b98e31b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -73,7 +73,10 @@ struct SVSnapReader { }; static TFileSetRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) { - ASSERTS(sizeof(pReader->pRsmaRanges) / sizeof(pReader->pRsmaRanges[0]) == 2, "Unexpected array size"); + if (!(sizeof(pReader->pRsmaRanges) / sizeof(pReader->pRsmaRanges[0]) == 2)) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } switch (tsdbTyp) { case SNAP_DATA_TSDB: return &pReader->pRanges; @@ -147,7 +150,6 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP pReader->tsdbRAWDone = true; } - ASSERT(pReader->tsdbDone != pReader->tsdbRAWDone); vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode), (pReader->tsdbDone ? "raw" : "normal")); } @@ -177,7 +179,6 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader // open tsdb snapshot raw reader if (!pReader->tsdbRAWDone) { - ASSERT(pReader->sver == 0); code = tsdbSnapRAWReaderOpen(pVnode->pTsdb, ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); if (code) goto _exit; } @@ -339,7 +340,6 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (!pReader->tsdbRAWDone) { // open if not if (pReader->pTsdbRAWReader == NULL) { - ASSERT(pReader->sver == 0); code = tsdbSnapRAWReaderOpen(pReader->pVnode->pTsdb, pReader->ever, SNAP_DATA_RAW, &pReader->pTsdbRAWReader); TSDB_CHECK_CODE(code, lino, _exit); } @@ -518,7 +518,6 @@ struct SVSnapWriter { }; TFileSetRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) { - ASSERTS(sizeof(pWriter->pRsmaRanges) / sizeof(pWriter->pRsmaRanges[0]) == 2, "Unexpected array size"); switch (tsdbTyp) { case SNAP_DATA_TSDB: return &pWriter->pRanges; @@ -674,7 +673,6 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * // commit json if (!rollback) { - ASSERT(pVnode->config.vgId == pWriter->info.config.vgId); pWriter->info.state.committed = pWriter->ever; pVnode->config = pWriter->info.config; pVnode->state = (SVState){.committed = pWriter->info.state.committed, @@ -794,7 +792,9 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; SVnode *pVnode = pWriter->pVnode; - ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData); + if (!(pHdr->size + sizeof(SSnapDataHdr) == nData)) { + return TSDB_CODE_INVALID_PARA; + } if (pHdr->index != pWriter->index + 1) { vError("vgId:%d, unexpected vnode snapshot msg. index:%" PRId64 ", expected index:%" PRId64, TD_VID(pVnode), @@ -837,7 +837,6 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { case SNAP_DATA_RAW: { // tsdb if (pWriter->pTsdbSnapRAWWriter == NULL) { - ASSERT(pWriter->sver == 0); code = tsdbSnapRAWWriterOpen(pVnode->pTsdb, pWriter->ever, &pWriter->pTsdbSnapRAWWriter); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index aae3da7ec4..75db6e2925 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -540,8 +540,13 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg TD_VID(pVnode), TMSG_INFO(pMsg->msgType), ver, pVnode->state.applied, pVnode->state.applyTerm, pMsg->info.conn.applyTerm); - ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm); - ASSERTS(pVnode->state.applied + 1 == ver, "applied:%" PRId64 ", ver:%" PRId64, pVnode->state.applied, ver); + if (!(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm)) { + return terrno = TSDB_CODE_INTERNAL_ERROR; + } + + if (!(pVnode->state.applied + 1 == ver)) { + return terrno = TSDB_CODE_INTERNAL_ERROR; + } atomic_store_64(&pVnode->state.applied, ver); atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm); @@ -981,7 +986,10 @@ static int32_t vnodeProcessFetchTtlExpiredTbs(SVnode *pVnode, int64_t ver, void goto _end; } - ASSERT(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids)); + if (!(ttlReq.nUids == taosArrayGetSize(ttlReq.pTbUids))) { + terrno = TSDB_CODE_INVALID_MSG; + goto _end; + } tb_uid_t suid; char ctbName[TSDB_TABLE_NAME_LEN]; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 31e44f5912..8c41ceba57 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -117,7 +117,9 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq); bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType)); if (wait) { - ASSERT(!pVnode->blocked); + if (pVnode->blocked) { + return TSDB_CODE_INTERNAL_ERROR; + } pVnode->blocked = true; pVnode->blockSec = taosGetTimestampSec(); pVnode->blockSeq = seq; @@ -175,7 +177,6 @@ static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize); bool wait = (code == 0 && vnodeIsBlockMsg(pLastMsg->msgType)); if (wait) { - ASSERT(!pVnode->blocked); pVnode->blocked = true; } (void)taosThreadMutexUnlock(&pVnode->lock); @@ -543,7 +544,11 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) do { appliedIdx = vnodeSyncAppliedIndex(pFsm); - ASSERT(appliedIdx <= commitIdx); + if (appliedIdx > commitIdx) { + vError("vgId:%d, restore failed since applied-index:%" PRId64 " is larger than commit-index:%" PRId64, vgId, + appliedIdx, commitIdx); + break; + } if (appliedIdx == commitIdx) { vInfo("vgId:%d, no items to be applied, restore finish", pVnode->config.vgId); break; @@ -555,7 +560,6 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) } } while (true); - ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm)); (void)walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true; diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 9c783f0cf6..ddcf4aebfa 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -87,14 +87,11 @@ struct SLRUEntry { #define TAOS_LRU_ENTRY_REF(h) (++(h)->refs) static bool taosLRUEntryUnref(SLRUEntry *entry) { - // ASSERT(entry->refs > 0); --entry->refs; return entry->refs == 0; } static void taosLRUEntryFree(SLRUEntry *entry) { - // ASSERT(entry->refs == 0); - if (entry->deleter) { (*entry->deleter)(entry->keyData, entry->keyLength, entry->value, entry->ud); } @@ -129,7 +126,6 @@ static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t SLRUEntry *h = table->list[i]; while (h) { SLRUEntry *n = h->nextHash; - // ASSERT(TAOS_LRU_ENTRY_IN_CACHE(h)); func(h); h = n; } @@ -155,7 +151,6 @@ static int taosLRUEntryTableApplyF(SLRUEntryTable *table, _taos_lru_functor_t fu SLRUEntry *h = table->list[i]; while (h) { SLRUEntry *n = h->nextHash; - // ASSERT(TAOS_LRU_ENTRY_IN_CACHE(h)); ret = functor(h->keyData, h->keyLength, h->value, ud); if (ret) { return ret; @@ -205,7 +200,6 @@ static void taosLRUEntryTableResize(SLRUEntryTable *table) { ++count; } } - // ASSERT(table->elems == count); taosMemoryFree(table->list); table->list = newList; @@ -261,17 +255,13 @@ struct SLRUCacheShard { static void taosLRUCacheShardMaintainPoolSize(SLRUCacheShard *shard) { while (shard->highPriPoolUsage > shard->highPriPoolCapacity) { shard->lruLowPri = shard->lruLowPri->next; - // ASSERT(shard->lruLowPri != &shard->lru); TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(shard->lruLowPri, false); - // ASSERT(shard->highPriPoolUsage >= shard->lruLowPri->totalCharge); shard->highPriPoolUsage -= shard->lruLowPri->totalCharge; } } static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) { - // ASSERT(e->next == NULL && e->prev == NULL); - if (shard->highPriPoolRatio > 0 && (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) { e->next = &shard->lru; e->prev = shard->lru.prev; @@ -297,8 +287,6 @@ static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) { } static void taosLRUCacheShardLRURemove(SLRUCacheShard *shard, SLRUEntry *e) { - // ASSERT(e->next && e->prev); - if (shard->lruLowPri == e) { shard->lruLowPri = e->prev; } @@ -306,10 +294,8 @@ static void taosLRUCacheShardLRURemove(SLRUCacheShard *shard, SLRUEntry *e) { e->prev->next = e->next; e->prev = e->next = NULL; - // ASSERT(shard->lruUsage >= e->totalCharge); shard->lruUsage -= e->totalCharge; if (TAOS_LRU_ENTRY_IN_HIGH_POOL(e)) { - // ASSERT(shard->highPriPoolUsage >= e->totalCharge); shard->highPriPoolUsage -= e->totalCharge; } } @@ -317,13 +303,11 @@ static void taosLRUCacheShardLRURemove(SLRUCacheShard *shard, SLRUEntry *e) { static void taosLRUCacheShardEvictLRU(SLRUCacheShard *shard, size_t charge, SArray *deleted) { while (shard->usage + charge > shard->capacity && shard->lru.next != &shard->lru) { SLRUEntry *old = shard->lru.next; - // ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old)); taosLRUCacheShardLRURemove(shard, old); (void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); - // ASSERT(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; (void)taosArrayPush(deleted, &old); @@ -414,11 +398,9 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * if (old != NULL) { status = TAOS_LRU_STATUS_OK_OVERWRITTEN; - // ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old)); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); if (!TAOS_LRU_ENTRY_HAS_REFS(old)) { taosLRUCacheShardLRURemove(shard, old); - // ASSERT(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; (void)taosArrayPush(lastReferenceList, &old); @@ -479,7 +461,6 @@ static LRUHandle *taosLRUCacheShardLookup(SLRUCacheShard *shard, const void *key (void)taosThreadMutexLock(&shard->mutex); e = taosLRUEntryTableLookup(&shard->table, key, keyLen, hash); if (e != NULL) { - // ASSERT(TAOS_LRU_ENTRY_IN_CACHE(e)); if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { taosLRUCacheShardLRURemove(shard, e); } @@ -498,12 +479,10 @@ static void taosLRUCacheShardErase(SLRUCacheShard *shard, const void *key, size_ SLRUEntry *e = taosLRUEntryTableRemove(&shard->table, key, keyLen, hash); if (e != NULL) { - // ASSERT(TAOS_LRU_ENTRY_IN_CACHE(e)); TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { taosLRUCacheShardLRURemove(shard, e); - // ASSERT(shard->usage >= e->totalCharge); shard->usage -= e->totalCharge; lastReference = true; } @@ -535,11 +514,9 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) { while (shard->lru.next != &shard->lru) { SLRUEntry *old = shard->lru.next; - // ASSERT(TAOS_LRU_ENTRY_IN_CACHE(old) && !TAOS_LRU_ENTRY_HAS_REFS(old)); taosLRUCacheShardLRURemove(shard, old); (void)taosLRUEntryTableRemove(&shard->table, old->keyData, old->keyLength, old->hash); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); - // ASSERT(shard->usage >= old->totalCharge); shard->usage -= old->totalCharge; (void)taosArrayPush(lastReferenceList, &old); @@ -560,7 +537,6 @@ static bool taosLRUCacheShardRef(SLRUCacheShard *shard, LRUHandle *handle) { SLRUEntry *e = (SLRUEntry *)handle; (void)taosThreadMutexLock(&shard->mutex); - // ASSERT(TAOS_LRU_ENTRY_HAS_REFS(e)); TAOS_LRU_ENTRY_REF(e); (void)taosThreadMutexUnlock(&shard->mutex); @@ -581,8 +557,6 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b lastReference = taosLRUEntryUnref(e); if (lastReference && TAOS_LRU_ENTRY_IN_CACHE(e)) { if (shard->usage > shard->capacity || eraseIfLastRef) { - // ASSERT(shard->lru.next == &shard->lru || eraseIfLastRef); - (void)taosLRUEntryTableRemove(&shard->table, e->keyData, e->keyLength, e->hash); TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); } else { @@ -593,7 +567,6 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b } if (lastReference && e->value) { - // ASSERT(shard->usage >= e->totalCharge); shard->usage -= e->totalCharge; } @@ -631,7 +604,6 @@ static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) { (void)taosThreadMutexLock(&shard->mutex); - // ASSERT(shard->usage >= shard->lruUsage); usage = shard->usage - shard->lruUsage; (void)taosThreadMutexUnlock(&shard->mutex); @@ -723,7 +695,6 @@ void taosLRUCacheCleanup(SLRUCache *cache) { if (cache) { if (cache->shards) { int numShards = cache->numShards; - // ASSERT(numShards > 0); for (int i = 0; i < numShards; ++i) { taosLRUCacheShardCleanup(&cache->shards[i]); } From 235fb59ede06b94d5469416d2d3394ed6390ccee Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Thu, 22 Aug 2024 18:02:23 +0800 Subject: [PATCH 3/4] fix: (ttl) continue flush cache after error occurs --- include/util/taoserror.h | 1 + source/dnode/vnode/src/meta/metaTtl.c | 36 ++++++++++++++++----------- source/util/src/terror.c | 5 ++-- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 1911c48d26..58f906e3f3 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -530,6 +530,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_VND_COLUMN_COMPRESS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0536) #define TSDB_CODE_VND_ARB_NOT_SYNCED TAOS_DEF_ERROR_CODE(0, 0x0537) // internal #define TSDB_CODE_VND_WRITE_DISABLED TAOS_DEF_ERROR_CODE(0, 0x0538) // internal +#define TSDB_CODE_VND_TTL_FLUSH_INCOMPLETION TAOS_DEF_ERROR_CODE(0, 0x0539) // internal // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/dnode/vnode/src/meta/metaTtl.c b/source/dnode/vnode/src/meta/metaTtl.c index cd1aa7bcad..0b5b9280df 100644 --- a/source/dnode/vnode/src/meta/metaTtl.c +++ b/source/dnode/vnode/src/meta/metaTtl.c @@ -408,7 +408,7 @@ int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { int64_t startNs = taosGetTimestampNs(); int64_t endNs = startNs; - metaTrace("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids)); + metaTrace("%s, ttl mgr flush start. num of dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids)); int32_t code = TSDB_CODE_SUCCESS; @@ -419,13 +419,8 @@ int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid)); if (cacheEntry == NULL) { - metaInfo("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid, + metaError("%s, ttlMgr flush failed to get ttl cache, uid: %" PRId64 ", type: %d", pTtlMgr->logPrefix, *pUid, pEntry->type); - code = taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(*pUid)); - if (TSDB_CODE_SUCCESS != code) { - metaError("%s, ttlMgr flush failed to remove dirty uid since %s", pTtlMgr->logPrefix, tstrerror(code)); - goto _out; - } continue; } @@ -437,31 +432,35 @@ int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { if (pEntry->type == ENTRY_TYPE_UPSERT) { // delete old key & upsert new key - (void)tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); // maybe first insert, ignore error + code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); // maybe first insert, ignore error + if (TSDB_CODE_SUCCESS != code && TSDB_CODE_NOT_FOUND != code) { + metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code)); + continue; + } code = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKeyDirty, sizeof(ttlKeyDirty), &cacheEntry->ttlDaysDirty, sizeof(cacheEntry->ttlDaysDirty), pTxn); if (TSDB_CODE_SUCCESS != code) { metaError("%s, ttlMgr flush failed to upsert since %s", pTtlMgr->logPrefix, tstrerror(code)); - goto _out; + continue; } cacheEntry->ttlDays = cacheEntry->ttlDaysDirty; cacheEntry->changeTimeMs = cacheEntry->changeTimeMsDirty; } else if (pEntry->type == ENTRY_TYPE_DELETE) { code = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn); - if (TSDB_CODE_SUCCESS != code) { + if (TSDB_CODE_SUCCESS != code && TSDB_CODE_NOT_FOUND != code) { metaError("%s, ttlMgr flush failed to delete since %s", pTtlMgr->logPrefix, tstrerror(code)); - goto _out; + continue; } code = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid)); if (TSDB_CODE_SUCCESS != code) { metaError("%s, ttlMgr flush failed to remove cache since %s", pTtlMgr->logPrefix, tstrerror(code)); - goto _out; + continue; } } else { metaError("%s, ttlMgr flush failed, unknown type: %d", pTtlMgr->logPrefix, pEntry->type); - goto _out; + continue; } metaDebug("isdel:%d", pEntry->type == ENTRY_TYPE_DELETE); @@ -471,11 +470,18 @@ int32_t ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) { code = taosHashRemove(pTtlMgr->pDirtyUids, pUid, sizeof(*pUid)); if (TSDB_CODE_SUCCESS != code) { metaError("%s, ttlMgr flush failed to remove dirty uid since %s", pTtlMgr->logPrefix, tstrerror(code)); - goto _out; + continue; } } - taosHashClear(pTtlMgr->pDirtyUids); + int32_t count = taosHashGetSize(pTtlMgr->pDirtyUids); + if (count != 0) { + taosHashClear(pTtlMgr->pDirtyUids); + metaError("%s, ttlMgr flush failed, dirty uids not empty, count: %d", pTtlMgr->logPrefix, count); + code = TSDB_CODE_VND_TTL_FLUSH_INCOMPLETION; + + goto _out; + } code = TSDB_CODE_SUCCESS; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 396abf21a7..0b6d2674fd 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -57,8 +57,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many ses TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit") -TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit") -TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") @@ -411,6 +411,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_META_DATA_UNSAFE_DELETE, "Single replica vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_ARB_NOT_SYNCED, "Vgroup peer is not synced") TAOS_DEFINE_ERROR(TSDB_CODE_VND_WRITE_DISABLED, "Vnode write is disabled for snapshot") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COLUMN_COMPRESS_ALREADY_EXIST,"Same with old param") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_TTL_FLUSH_INCOMPLETION, "Failed to flush all ttl modification to tdb") // tsdb From 8fe4ba4a9f9f7b01012485ddd925c08bb4e26095 Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Thu, 22 Aug 2024 20:33:45 +0800 Subject: [PATCH 4/4] recover ci sample code --- examples/c/CMakeLists.txt | 78 +++++++ examples/c/asyncdemo.c | 293 ++++++++++++++++++++++++ examples/c/demo.c | 133 +++++++++++ examples/c/prepare.c | 235 ++++++++++++++++++++ examples/c/schemaless.c | 73 ++++++ examples/c/stream_demo.c | 120 ++++++++++ examples/c/tmq.c | 333 ++++++++++++++++++++++++++++ examples/rust/.gitignore | 2 + examples/rust/Cargo.toml | 18 ++ examples/rust/examples/bind-tags.rs | 80 +++++++ examples/rust/examples/bind.rs | 74 +++++++ examples/rust/examples/query.rs | 106 +++++++++ examples/rust/examples/subscribe.rs | 103 +++++++++ examples/rust/src/main.rs | 3 + examples/rust/wrapper.h | 1 + 15 files changed, 1652 insertions(+) create mode 100644 examples/c/CMakeLists.txt create mode 100644 examples/c/asyncdemo.c create mode 100644 examples/c/demo.c create mode 100644 examples/c/prepare.c create mode 100644 examples/c/schemaless.c create mode 100644 examples/c/stream_demo.c create mode 100644 examples/c/tmq.c create mode 100644 examples/rust/.gitignore create mode 100644 examples/rust/Cargo.toml create mode 100644 examples/rust/examples/bind-tags.rs create mode 100644 examples/rust/examples/bind.rs create mode 100644 examples/rust/examples/query.rs create mode 100644 examples/rust/examples/subscribe.rs create mode 100644 examples/rust/src/main.rs create mode 100644 examples/rust/wrapper.h diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt new file mode 100644 index 0000000000..07fc2fd71b --- /dev/null +++ b/examples/c/CMakeLists.txt @@ -0,0 +1,78 @@ +PROJECT(TDengine) + +IF (TD_LINUX) + INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc) + AUX_SOURCE_DIRECTORY(. SRC) + + add_executable(tmq "") + add_executable(stream_demo "") + add_executable(schemaless "") + add_executable(prepare "") + add_executable(demo "") + add_executable(asyncdemo "") + + target_sources(tmq + PRIVATE + "tmq.c" + ) + + target_sources(stream_demo + PRIVATE + "stream_demo.c" + ) + + target_sources(schemaless + PRIVATE + "schemaless.c" + ) + + target_sources(prepare + PRIVATE + "prepare.c" + ) + + target_sources(demo + PRIVATE + "demo.c" + ) + + target_sources(asyncdemo + PRIVATE + "asyncdemo.c" + ) + + target_link_libraries(tmq + taos + ) + + target_link_libraries(stream_demo + taos + ) + + target_link_libraries(schemaless + taos + ) + + target_link_libraries(prepare + taos + ) + + target_link_libraries(demo + taos + ) + + target_link_libraries(asyncdemo + taos + ) + + SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq) + SET_TARGET_PROPERTIES(stream_demo PROPERTIES OUTPUT_NAME stream_demo) + SET_TARGET_PROPERTIES(schemaless PROPERTIES OUTPUT_NAME schemaless) + SET_TARGET_PROPERTIES(prepare PROPERTIES OUTPUT_NAME prepare) + SET_TARGET_PROPERTIES(demo PROPERTIES OUTPUT_NAME demo) + SET_TARGET_PROPERTIES(asyncdemo PROPERTIES OUTPUT_NAME asyncdemo) +ENDIF () +IF (TD_DARWIN) + INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc) + AUX_SOURCE_DIRECTORY(. SRC) +ENDIF () diff --git a/examples/c/asyncdemo.c b/examples/c/asyncdemo.c new file mode 100644 index 0000000000..91ec6f24b1 --- /dev/null +++ b/examples/c/asyncdemo.c @@ -0,0 +1,293 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS asynchronous API example +// this example opens multiple tables, insert/retrieve multiple tables +// it is used by TAOS internally for one performance testing +// to compiple: gcc -o asyncdemo asyncdemo.c -ltaos + +#include +#include +#include +#include +#include +#include +#include "taos.h" + +int points = 5; +int numOfTables = 3; +int tablesInsertProcessed = 0; +int tablesSelectProcessed = 0; +int64_t st, et; + +typedef struct { + int id; + TAOS *taos; + char name[32]; + time_t timeStamp; + int value; + int rowsInserted; + int rowsTried; + int rowsRetrieved; +} STable; + +void taos_insert_call_back(void *param, TAOS_RES *tres, int code); +void taos_select_call_back(void *param, TAOS_RES *tres, int code); +void shellPrintError(TAOS *taos); + +static void queryDB(TAOS *taos, char *command) { + int i; + TAOS_RES *pSql = NULL; + int32_t code = -1; + + for (i = 0; i < 5; i++) { + if (NULL != pSql) { + taos_free_result(pSql); + pSql = NULL; + } + + pSql = taos_query(taos, command); + code = taos_errno(pSql); + if (0 == code) { + break; + } + } + + if (code != 0) { + fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql)); + taos_free_result(pSql); + taos_close(taos); + taos_cleanup(); + exit(EXIT_FAILURE); + } + + taos_free_result(pSql); +} + +int main(int argc, char *argv[]) +{ + TAOS *taos; + struct timeval systemTime; + int i; + char sql[1024] = { 0 }; + char prefix[20] = { 0 }; + char db[128] = { 0 }; + STable *tableList; + + if (argc != 5) { + printf("usage: %s server-ip dbname rowsPerTable numOfTables\n", argv[0]); + exit(0); + } + + // a simple way to parse input parameters + if (argc >= 3) strncpy(db, argv[2], sizeof(db) - 1); + if (argc >= 4) points = atoi(argv[3]); + if (argc >= 5) numOfTables = atoi(argv[4]); + + size_t size = sizeof(STable) * (size_t)numOfTables; + tableList = (STable *)malloc(size); + memset(tableList, 0, size); + + taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); + if (taos == NULL) + shellPrintError(taos); + + printf("success to connect to server\n"); + + sprintf(sql, "drop database if exists %s", db); + queryDB(taos, sql); + + sprintf(sql, "create database %s", db); + queryDB(taos, sql); + + sprintf(sql, "use %s", db); + queryDB(taos, sql); + + strcpy(prefix, "asytbl_"); + for (i = 0; i < numOfTables; ++i) { + tableList[i].id = i; + tableList[i].taos = taos; + sprintf(tableList[i].name, "%s%d", prefix, i); + sprintf(sql, "create table %s%d (ts timestamp, volume bigint)", prefix, i); + queryDB(taos, sql); + } + + gettimeofday(&systemTime, NULL); + for (i = 0; i < numOfTables; ++i) + tableList[i].timeStamp = (time_t)(systemTime.tv_sec) * 1000 + systemTime.tv_usec / 1000; + + printf("success to create tables, press any key to insert\n"); + getchar(); + + printf("start to insert...\n"); + gettimeofday(&systemTime, NULL); + st = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + + tablesInsertProcessed = 0; + tablesSelectProcessed = 0; + + for (i = 0; irowsTried++; + + if (code < 0) { + printf("%s insert failed, code:%d, rows:%d\n", pTable->name, code, pTable->rowsTried); + } + else if (code == 0) { + printf("%s not inserted\n", pTable->name); + } + else { + pTable->rowsInserted++; + } + + if (pTable->rowsTried < points) { + // for this demo, insert another record + sprintf(sql, "insert into %s values(%ld, %d)", pTable->name, 1546300800000+pTable->rowsTried*1000, pTable->rowsTried); + taos_query_a(pTable->taos, sql, taos_insert_call_back, (void *)pTable); + } + else { + printf("%d rows data are inserted into %s\n", points, pTable->name); + tablesInsertProcessed++; + if (tablesInsertProcessed >= numOfTables) { + gettimeofday(&systemTime, NULL); + et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + printf("%" PRId64 " mseconds to insert %d data points\n", (et - st) / 1000, points*numOfTables); + } + } + + taos_free_result(tres); +} + +void taos_retrieve_call_back(void *param, TAOS_RES *tres, int numOfRows) +{ + STable *pTable = (STable *)param; + struct timeval systemTime; + + if (numOfRows > 0) { + + for (int i = 0; irowsRetrieved += numOfRows; + + // retrieve next batch of rows + taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable); + + } + else { + if (numOfRows < 0) + printf("%s retrieve failed, code:%d\n", pTable->name, numOfRows); + + //taos_free_result(tres); + printf("%d rows data retrieved from %s\n", pTable->rowsRetrieved, pTable->name); + + tablesSelectProcessed++; + if (tablesSelectProcessed >= numOfTables) { + gettimeofday(&systemTime, NULL); + et = systemTime.tv_sec * 1000000 + systemTime.tv_usec; + printf("%" PRId64 " mseconds to query %d data rows\n", (et - st) / 1000, points * numOfTables); + } + + taos_free_result(tres); + } + + +} + +void taos_select_call_back(void *param, TAOS_RES *tres, int code) +{ + STable *pTable = (STable *)param; + + if (code == 0 && tres) { + // asynchronous API to fetch a batch of records + taos_fetch_rows_a(tres, taos_retrieve_call_back, pTable); + } + else { + printf("%s select failed, code:%d\n", pTable->name, code); + taos_free_result(tres); + taos_cleanup(); + exit(1); + } +} diff --git a/examples/c/demo.c b/examples/c/demo.c new file mode 100644 index 0000000000..0351aecbd1 --- /dev/null +++ b/examples/c/demo.c @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// TAOS standard API example. The same syntax as MySQL, but only a subset +// to compile: gcc -o demo demo.c -ltaos + +#include +#include +#include +#include +#include "taos.h" // TAOS header file + +static void queryDB(TAOS *taos, char *command) { + int i; + TAOS_RES *pSql = NULL; + int32_t code = -1; + + for (i = 0; i < 5; i++) { + if (NULL != pSql) { + taos_free_result(pSql); + pSql = NULL; + } + + pSql = taos_query(taos, command); + code = taos_errno(pSql); + if (0 == code) { + break; + } + } + + if (code != 0) { + fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(pSql)); + taos_free_result(pSql); + taos_close(taos); + exit(EXIT_FAILURE); + } + + taos_free_result(pSql); +} + +void Test(TAOS *taos, char *qstr, int i); + +int main(int argc, char *argv[]) { + char qstr[1024]; + + // connect to server + if (argc < 2) { + printf("please input server-ip \n"); + return 0; + } + + TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); + if (taos == NULL) { + printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); + exit(1); + } + for (int i = 0; i < 100; i++) { + Test(taos, qstr, i); + } + taos_close(taos); + taos_cleanup(); +} +void Test(TAOS *taos, char *qstr, int index) { + printf("==================test at %d\n================================", index); + queryDB(taos, "drop database if exists demo"); + queryDB(taos, "create database demo"); + TAOS_RES *result; + queryDB(taos, "use demo"); + + queryDB(taos, "create table m1 (ts timestamp, ti tinyint, si smallint, i int, bi bigint, f float, d double, b binary(10))"); + printf("success to create table\n"); + + int i = 0; + for (i = 0; i < 10; ++i) { + sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)(1546300800000 + i * 1000), i, i, i, i*10000000, i*1.0, i*2.0, "hello"); + printf("qstr: %s\n", qstr); + + // note: how do you wanna do if taos_query returns non-NULL + // if (taos_query(taos, qstr)) { + // printf("insert row: %i, reason:%s\n", i, taos_errstr(taos)); + // } + TAOS_RES *result1 = taos_query(taos, qstr); + if (result1 == NULL || taos_errno(result1) != 0) { + printf("failed to insert row, reason:%s\n", taos_errstr(result1)); + taos_free_result(result1); + exit(1); + } else { + printf("insert row: %i\n", i); + } + taos_free_result(result1); + } + printf("success to insert rows, total %d rows\n", i); + + // query the records + sprintf(qstr, "SELECT * FROM m1"); + result = taos_query(taos, qstr); + if (result == NULL || taos_errno(result) != 0) { + printf("failed to select, reason:%s\n", taos_errstr(result)); + taos_free_result(result); + exit(1); + } + + TAOS_ROW row; + int rows = 0; + int num_fields = taos_field_count(result); + TAOS_FIELD *fields = taos_fetch_fields(result); + + printf("num_fields = %d\n", num_fields); + printf("select * from table, result:\n"); + // fetch the records row by row + while ((row = taos_fetch_row(result))) { + char temp[1024] = {0}; + rows++; + taos_print_row(temp, row, fields, num_fields); + printf("%s\n", temp); + } + + taos_free_result(result); + printf("====demo end====\n\n"); +} + diff --git a/examples/c/prepare.c b/examples/c/prepare.c new file mode 100644 index 0000000000..aee8400663 --- /dev/null +++ b/examples/c/prepare.c @@ -0,0 +1,235 @@ +// TAOS standard API example. The same syntax as MySQL, but only a subet +// to compile: gcc -o prepare prepare.c -ltaos + +#include +#include +#include +#include "taos.h" + +void taosMsleep(int mseconds); + +int main(int argc, char *argv[]) +{ + TAOS *taos; + TAOS_RES *result; + int code; + TAOS_STMT *stmt; + + // connect to server + if (argc < 2) { + printf("please input server ip \n"); + return 0; + } + + taos = taos_connect(argv[1], "root", "taosdata", NULL, 0); + if (taos == NULL) { + printf("failed to connect to db, reason:%s\n", taos_errstr(taos)); + exit(1); + } + + result = taos_query(taos, "drop database demo"); + taos_free_result(result); + + result = taos_query(taos, "create database demo"); + code = taos_errno(result); + if (code != 0) { + printf("failed to create database, reason:%s\n", taos_errstr(result)); + taos_free_result(result); + exit(1); + } + taos_free_result(result); + + result = taos_query(taos, "use demo"); + taos_free_result(result); + + // create table + const char* sql = "create table m1 (ts timestamp, b bool, v1 tinyint, v2 smallint, v4 int, v8 bigint, f4 float, f8 double, bin binary(40), blob nchar(10), varbin varbinary(16))"; + result = taos_query(taos, sql); + code = taos_errno(result); + if (code != 0) { + printf("failed to create table, reason:%s\n", taos_errstr(result)); + taos_free_result(result); + exit(1); + } + taos_free_result(result); + + // sleep for one second to make sure table is created on data node + // taosMsleep(1000); + + // insert 10 records + struct { + int64_t ts; + int8_t b; + int8_t v1; + int16_t v2; + int32_t v4; + int64_t v8; + float f4; + double f8; + char bin[40]; + char blob[80]; + int8_t varbin[16]; + } v = {0}; + + int32_t boolLen = sizeof(int8_t); + int32_t sintLen = sizeof(int16_t); + int32_t intLen = sizeof(int32_t); + int32_t bintLen = sizeof(int64_t); + int32_t floatLen = sizeof(float); + int32_t doubleLen = sizeof(double); + int32_t binLen = sizeof(v.bin); + int32_t ncharLen = 30; + + stmt = taos_stmt_init(taos); + TAOS_MULTI_BIND params[11]; + params[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP; + params[0].buffer_length = sizeof(v.ts); + params[0].buffer = &v.ts; + params[0].length = &bintLen; + params[0].is_null = NULL; + params[0].num = 1; + + params[1].buffer_type = TSDB_DATA_TYPE_BOOL; + params[1].buffer_length = sizeof(v.b); + params[1].buffer = &v.b; + params[1].length = &boolLen; + params[1].is_null = NULL; + params[1].num = 1; + + params[2].buffer_type = TSDB_DATA_TYPE_TINYINT; + params[2].buffer_length = sizeof(v.v1); + params[2].buffer = &v.v1; + params[2].length = &boolLen; + params[2].is_null = NULL; + params[2].num = 1; + + params[3].buffer_type = TSDB_DATA_TYPE_SMALLINT; + params[3].buffer_length = sizeof(v.v2); + params[3].buffer = &v.v2; + params[3].length = &sintLen; + params[3].is_null = NULL; + params[3].num = 1; + + params[4].buffer_type = TSDB_DATA_TYPE_INT; + params[4].buffer_length = sizeof(v.v4); + params[4].buffer = &v.v4; + params[4].length = &intLen; + params[4].is_null = NULL; + params[4].num = 1; + + params[5].buffer_type = TSDB_DATA_TYPE_BIGINT; + params[5].buffer_length = sizeof(v.v8); + params[5].buffer = &v.v8; + params[5].length = &bintLen; + params[5].is_null = NULL; + params[5].num = 1; + + params[6].buffer_type = TSDB_DATA_TYPE_FLOAT; + params[6].buffer_length = sizeof(v.f4); + params[6].buffer = &v.f4; + params[6].length = &floatLen; + params[6].is_null = NULL; + params[6].num = 1; + + params[7].buffer_type = TSDB_DATA_TYPE_DOUBLE; + params[7].buffer_length = sizeof(v.f8); + params[7].buffer = &v.f8; + params[7].length = &doubleLen; + params[7].is_null = NULL; + params[7].num = 1; + + params[8].buffer_type = TSDB_DATA_TYPE_BINARY; + params[8].buffer_length = sizeof(v.bin); + params[8].buffer = v.bin; + params[8].length = &binLen; + params[8].is_null = NULL; + params[8].num = 1; + + strcpy(v.blob, "一二三四五六七八九十"); + params[9].buffer_type = TSDB_DATA_TYPE_NCHAR; + params[9].buffer_length = sizeof(v.blob); + params[9].buffer = v.blob; + params[9].length = &ncharLen; + params[9].is_null = NULL; + params[9].num = 1; + + int8_t tmp[16] = {'a', 0, 1, 13, '1'}; + int32_t vbinLen = 5; + memcpy(v.varbin, tmp, sizeof(v.varbin)); + params[10].buffer_type = TSDB_DATA_TYPE_VARBINARY; + params[10].buffer_length = sizeof(v.varbin); + params[10].buffer = v.varbin; + params[10].length = &vbinLen; + params[10].is_null = NULL; + params[10].num = 1; + + char is_null = 1; + + sql = "insert into m1 values(?,?,?,?,?,?,?,?,?,?,?)"; + code = taos_stmt_prepare(stmt, sql, 0); + if (code != 0){ + printf("failed to execute taos_stmt_prepare. code:0x%x\n", code); + } + v.ts = 1591060628000; + for (int i = 0; i < 10; ++i) { + v.ts += 1; + for (int j = 1; j < 11; ++j) { + params[j].is_null = ((i == j) ? &is_null : 0); + } + v.b = (int8_t)i % 2; + v.v1 = (int8_t)i; + v.v2 = (int16_t)(i * 2); + v.v4 = (int32_t)(i * 4); + v.v8 = (int64_t)(i * 8); + v.f4 = (float)(i * 40); + v.f8 = (double)(i * 80); + for (int j = 0; j < sizeof(v.bin); ++j) { + v.bin[j] = (char)(i + '0'); + } + + taos_stmt_bind_param(stmt, params); + taos_stmt_add_batch(stmt); + } + if (taos_stmt_execute(stmt) != 0) { + printf("failed to execute insert statement.\n"); + exit(1); + } + taos_stmt_close(stmt); + + // query the records + stmt = taos_stmt_init(taos); + taos_stmt_prepare(stmt, "SELECT * FROM m1 WHERE v1 > ? AND v2 < ?", 0); + v.v1 = 5; + v.v2 = 15; + taos_stmt_bind_param(stmt, params + 2); + if (taos_stmt_execute(stmt) != 0) { + printf("failed to execute select statement.\n"); + exit(1); + } + + result = taos_stmt_use_result(stmt); + + TAOS_ROW row; + int rows = 0; + int num_fields = taos_num_fields(result); + TAOS_FIELD *fields = taos_fetch_fields(result); + + // fetch the records row by row + while ((row = taos_fetch_row(result))) { + char temp[256] = {0}; + rows++; + taos_print_row(temp, row, fields, num_fields); + printf("%s\n", temp); + } + if (rows == 2) { + printf("two rows are fetched as expectation\n"); + } else { + printf("expect two rows, but %d rows are fetched\n", rows); + } + +// taos_free_result(result); + taos_stmt_close(stmt); + + return 0; +} + diff --git a/examples/c/schemaless.c b/examples/c/schemaless.c new file mode 100644 index 0000000000..328a663ae7 --- /dev/null +++ b/examples/c/schemaless.c @@ -0,0 +1,73 @@ +#include "taos.h" +#include +#include +#include +#include +#include +#include +#include + + +int numSuperTables = 8; +int numChildTables = 4; +int numRowsPerChildTable = 2048; + +static int64_t getTimeInUs() { + struct timeval systemTime; + gettimeofday(&systemTime, NULL); + return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec; +} + +int main(int argc, char* argv[]) { + TAOS_RES *result; + const char* host = "127.0.0.1"; + const char* user = "root"; + const char* passwd = "taosdata"; + + taos_options(TSDB_OPTION_TIMEZONE, "GMT-8"); + TAOS* taos = taos_connect(host, user, passwd, "", 0); + if (taos == NULL) { + printf("\033[31mfailed to connect to db, reason:%s\033[0m\n", taos_errstr(taos)); + exit(1); + } + + const char* info = taos_get_server_info(taos); + printf("server info: %s\n", info); + info = taos_get_client_info(taos); + printf("client info: %s\n", info); + result = taos_query(taos, "drop database if exists db;"); + taos_free_result(result); + usleep(100000); + result = taos_query(taos, "create database db precision 'ms';"); + taos_free_result(result); + usleep(100000); + + (void)taos_select_db(taos, "db"); + + time_t ct = time(0); + int64_t ts = ct * 1000; + char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %" PRId64; + + int lineNum = numSuperTables * numChildTables * numRowsPerChildTable; + char** lines = calloc((size_t)lineNum, sizeof(char*)); + int l = 0; + for (int i = 0; i < numSuperTables; ++i) { + for (int j = 0; j < numChildTables; ++j) { + for (int k = 0; k < numRowsPerChildTable; ++k) { + char* line = calloc(512, 1); + snprintf(line, 512, lineFormat, i, j, ts + 10 * l); + lines[l] = line; + ++l; + } + } + } + + printf("%s\n", "begin taos_insert_lines"); + int64_t begin = getTimeInUs(); + TAOS_RES *res = taos_schemaless_insert(taos, lines, lineNum, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_MILLI_SECONDS); + int64_t end = getTimeInUs(); + printf("code: %s. time used: %" PRId64 "\n", taos_errstr(res), end-begin); + taos_free_result(res); + + return 0; +} diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c new file mode 100644 index 0000000000..46107c7e13 --- /dev/null +++ b/examples/c/stream_demo.c @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// clang-format off +#include +#include +#include +#include +#include "taos.h" + +int32_t init_env() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + +#if 0 + pRes = taos_query(pConn, "create database if not exists abc2 vgroups 20"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); +#endif + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int, j varchar(20)) tags(a varchar(20))"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags('c1')"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags('c2')"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists tu3 using st1 tags('c3')"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + return 0; +} + +int32_t create_stream() { + printf("create stream\n"); + TAOS_RES* pRes; + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, + /*"create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");*/ + "create stream stream2 into outstb subtable(concat(concat(concat('prefix_', tname), '_suffix_'), cast(k1 as varchar(20)))) as select _wstart wstart, avg(k) from st1 partition by tbname tname, a k1 interval(10s);"); + if (taos_errno(pRes) != 0) { + printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + taos_close(pConn); + return 0; +} + +int main(int argc, char* argv[]) { + if (argc > 1) { + printf("env init\n"); + int code = init_env(); + if (code) { + return code; + } + + } + create_stream(); +} diff --git a/examples/c/tmq.c b/examples/c/tmq.c new file mode 100644 index 0000000000..15ab4fcfc9 --- /dev/null +++ b/examples/c/tmq.c @@ -0,0 +1,333 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "taos.h" + +static int running = 1; +const char* topic_name = "topicname"; + +static int32_t msg_process(TAOS_RES* msg) { + char buf[1024]; + int32_t rows = 0; + + const char* topicName = tmq_get_topic_name(msg); + const char* dbName = tmq_get_db_name(msg); + int32_t vgroupId = tmq_get_vgroup_id(msg); + + printf("topic: %s\n", topicName); + printf("db: %s\n", dbName); + printf("vgroup id: %d\n", vgroupId); + + while (1) { + TAOS_ROW row = taos_fetch_row(msg); + if (row == NULL) break; + + TAOS_FIELD* fields = taos_fetch_fields(msg); + int32_t numOfFields = taos_field_count(msg); + // int32_t* length = taos_fetch_lengths(msg); + int32_t precision = taos_result_precision(msg); + rows++; + taos_print_row(buf, row, fields, numOfFields); + printf("precision: %d, row content: %s\n", precision, buf); + } + + return rows; +} + +static int32_t init_env() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + TAOS_RES* pRes; + // drop database if exists + printf("create database\n"); + pRes = taos_query(pConn, "drop topic topicname"); + if (taos_errno(pRes) != 0) { + printf("error in drop topicname, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists tmqdb"); + if (taos_errno(pRes) != 0) { + printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + // create database + pRes = taos_query(pConn, "create database tmqdb precision 'ns' WAL_RETENTION_PERIOD 3600"); + if (taos_errno(pRes) != 0) { + printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + // create super table + printf("create super table\n"); + pRes = taos_query( + pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + // create sub tables + printf("create sub tables\n"); + pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + // insert data + printf("insert data into sub tables\n"); + pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); + goto END; + } + taos_free_result(pRes); + taos_close(pConn); + return 0; + +END: + taos_free_result(pRes); + taos_close(pConn); + return -1; +} + +int32_t create_topic() { + printf("create topic\n"); + TAOS_RES* pRes; + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + pRes = taos_query(pConn, "use tmqdb"); + if (taos_errno(pRes) != 0) { + printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1"); + if (taos_errno(pRes) != 0) { + printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + taos_close(pConn); + return 0; +} + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param); +} + +tmq_t* build_consumer() { + tmq_conf_res_t code; + tmq_t* tmq = NULL; + + tmq_conf_t* conf = tmq_conf_new(); + code = tmq_conf_set(conf, "enable.auto.commit", "true"); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "group.id", "cgrpName"); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "client.id", "user defined name"); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "td.connect.user", "root"); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq = tmq_consumer_new(conf, NULL, 0); + +_end: + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topicList = tmq_list_new(); + int32_t code = tmq_list_append(topicList, topic_name); + if (code) { + tmq_list_destroy(topicList); + return NULL; + } + return topicList; +} + +void basic_consume_loop(tmq_t* tmq) { + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 5000; + while (running) { + TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); + if (tmqmsg) { + msgCnt++; + totalRows += msg_process(tmqmsg); + taos_free_result(tmqmsg); + } else { + break; + } + } + + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} + +void consume_repeatly(tmq_t* tmq) { + int32_t numOfAssignment = 0; + tmq_topic_assignment* pAssign = NULL; + + int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment); + if (code != 0) { + fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code)); + } + + // seek to the earliest offset + for(int32_t i = 0; i < numOfAssignment; ++i) { + tmq_topic_assignment* p = &pAssign[i]; + + code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin); + if (code != 0) { + fprintf(stderr, "failed to seek to %d, reason:%s", (int)p->begin, tmq_err2str(code)); + } + } + + tmq_free_assignment(pAssign); + + // let's do it again + basic_consume_loop(tmq); +} + +int main(int argc, char* argv[]) { + int32_t code; + + if (init_env() < 0) { + return -1; + } + + if (create_topic() < 0) { + return -1; + } + + tmq_t* tmq = build_consumer(); + if (NULL == tmq) { + fprintf(stderr, "build_consumer() fail!\n"); + return -1; + } + + tmq_list_t* topic_list = build_topic_list(); + if (NULL == topic_list) { + return -1; + } + + if ((code = tmq_subscribe(tmq, topic_list))) { + fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code)); + } + + tmq_list_destroy(topic_list); + + basic_consume_loop(tmq); + + consume_repeatly(tmq); + + code = tmq_consumer_close(tmq); + if (code) { + fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code)); + } else { + fprintf(stderr, "Consumer closed\n"); + } + + return 0; +} diff --git a/examples/rust/.gitignore b/examples/rust/.gitignore new file mode 100644 index 0000000000..96ef6c0b94 --- /dev/null +++ b/examples/rust/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml new file mode 100644 index 0000000000..1ed73e2fde --- /dev/null +++ b/examples/rust/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "rust" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +taos = "*" + +[dev-dependencies] +chrono = "0.4" +itertools = "0.10.3" +pretty_env_logger = "0.4.0" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = ["full"] } +anyhow = "1" diff --git a/examples/rust/examples/bind-tags.rs b/examples/rust/examples/bind-tags.rs new file mode 100644 index 0000000000..a1f7286625 --- /dev/null +++ b/examples/rust/examples/bind-tags.rs @@ -0,0 +1,80 @@ +use anyhow::Result; +use serde::Deserialize; +use taos::*; + +#[tokio::main] +async fn main() -> Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build()?; + taos.exec_many([ + "drop database if exists test", + "create database test keep 36500", + "use test", + "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, + c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, + c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t1 varchar(100))", + ]) + .await?; + let mut stmt = Stmt::init(&taos)?; + stmt.prepare( + "insert into ? using tb1 tags(?) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + )?; + stmt.set_tbname("d0")?; + stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; + + let params = vec![ + ColumnView::from_millis_timestamp(vec![164000000000]), + ColumnView::from_bools(vec![true]), + ColumnView::from_tiny_ints(vec![i8::MAX]), + ColumnView::from_small_ints(vec![i16::MAX]), + ColumnView::from_ints(vec![i32::MAX]), + ColumnView::from_big_ints(vec![i64::MAX]), + ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), + ColumnView::from_unsigned_small_ints(vec![u16::MAX]), + ColumnView::from_unsigned_ints(vec![u32::MAX]), + ColumnView::from_unsigned_big_ints(vec![u64::MAX]), + ColumnView::from_floats(vec![f32::MAX]), + ColumnView::from_doubles(vec![f64::MAX]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), + ]; + let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; + assert_eq!(rows, 1); + + #[derive(Debug, Deserialize)] + #[allow(dead_code)] + struct Row { + ts: String, + c1: bool, + c2: i8, + c3: i16, + c4: i32, + c5: i64, + c6: u8, + c7: u16, + c8: u32, + c9: u64, + c10: Option, + c11: f64, + c12: String, + c13: String, + t1: serde_json::Value, + } + + let rows: Vec = taos + .query("select * from tb1") + .await? + .deserialize() + .try_collect() + .await?; + let row = &rows[0]; + dbg!(&row); + assert_eq!(row.c5, i64::MAX); + assert_eq!(row.c8, u32::MAX); + assert_eq!(row.c9, u64::MAX); + assert_eq!(row.c10.unwrap(), f32::MAX); + // assert_eq!(row.c11, f64::MAX); + assert_eq!(row.c12, "ABC"); + assert_eq!(row.c13, "涛思数据"); + + Ok(()) +} diff --git a/examples/rust/examples/bind.rs b/examples/rust/examples/bind.rs new file mode 100644 index 0000000000..194938a319 --- /dev/null +++ b/examples/rust/examples/bind.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use serde::Deserialize; +use taos::*; + +#[tokio::main] +async fn main() -> Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build()?; + taos.exec_many([ + "drop database if exists test_bindable", + "create database test_bindable keep 36500", + "use test_bindable", + "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, + c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, + c10 float, c11 double, c12 varchar(100), c13 nchar(100))", + ]) + .await?; + let mut stmt = Stmt::init(&taos)?; + stmt.prepare("insert into tb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")?; + let params = vec![ + ColumnView::from_millis_timestamp(vec![0]), + ColumnView::from_bools(vec![true]), + ColumnView::from_tiny_ints(vec![i8::MAX]), + ColumnView::from_small_ints(vec![i16::MAX]), + ColumnView::from_ints(vec![i32::MAX]), + ColumnView::from_big_ints(vec![i64::MAX]), + ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), + ColumnView::from_unsigned_small_ints(vec![u16::MAX]), + ColumnView::from_unsigned_ints(vec![u32::MAX]), + ColumnView::from_unsigned_big_ints(vec![u64::MAX]), + ColumnView::from_floats(vec![f32::MAX]), + ColumnView::from_doubles(vec![f64::MAX]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), + ]; + let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; + assert_eq!(rows, 1); + + #[derive(Debug, Deserialize)] + #[allow(dead_code)] + struct Row { + ts: String, + c1: bool, + c2: i8, + c3: i16, + c4: i32, + c5: i64, + c6: u8, + c7: u16, + c8: u32, + c9: u64, + c10: Option, + c11: f64, + c12: String, + c13: String, + } + + let rows: Vec = taos + .query("select * from tb1") + .await? + .deserialize() + .try_collect() + .await?; + let row = &rows[0]; + dbg!(&row); + assert_eq!(row.c5, i64::MAX); + assert_eq!(row.c8, u32::MAX); + assert_eq!(row.c9, u64::MAX); + assert_eq!(row.c10.unwrap(), f32::MAX); + // assert_eq!(row.c11, f64::MAX); + assert_eq!(row.c12, "ABC"); + assert_eq!(row.c13, "涛思数据"); + + Ok(()) +} diff --git a/examples/rust/examples/query.rs b/examples/rust/examples/query.rs new file mode 100644 index 0000000000..016b291abc --- /dev/null +++ b/examples/rust/examples/query.rs @@ -0,0 +1,106 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://"; + + let opts = PoolBuilder::new() + .max_size(5000) // max connections + .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection + .min_idle(Some(1000)) // minimal idle connections + .connection_timeout(Duration::from_secs(2)); + + let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?; + + let taos = pool.get()?; + + let db = "query"; + + // prepare database + taos.exec_many([ + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + + let inserted = taos.exec_many([ + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))", + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + + assert_eq!(inserted, 6); + loop { + let count: usize = taos + .query_one("select count(*) from `meters`") + .await? + .unwrap_or_default(); + + if count >= 6 { + break; + } else { + println!("waiting for data"); + } + } + + let mut result = taos.query("select tbname, * from `meters`").await?; + + for field in result.fields() { + println!("got field: {}", field.name()); + } + + // Query option 1, use rows stream. + let mut rows = result.rows(); + let mut nrows = 0; + while let Some(row) = rows.try_next().await? { + for (col, (name, value)) in row.enumerate() { + println!( + "[{}] got value in col {} (named `{:>8}`): {}", + nrows, col, name, value + ); + } + nrows += 1; + } + + // Query options 2, use deserialization with serde. + #[derive(Debug, serde::Deserialize)] + #[allow(dead_code)] + struct Record { + tbname: String, + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, + groupid: i32, + // binary/varchar to String + location: String, + } + + let records: Vec = taos + .query("select tbname, * from `meters`") + .await? + .deserialize() + .try_collect() + .await?; + + dbg!(result.summary()); + assert_eq!(records.len(), 6); + dbg!(records); + Ok(()) +} diff --git a/examples/rust/examples/subscribe.rs b/examples/rust/examples/subscribe.rs new file mode 100644 index 0000000000..9e2e890405 --- /dev/null +++ b/examples/rust/examples/subscribe.rs @@ -0,0 +1,103 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +// Query options 2, use deserialization with serde. +#[derive(Debug, serde::Deserialize)] +#[allow(dead_code)] +struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, +} + +async fn prepare(taos: Taos) -> anyhow::Result<()> { + let inserted = taos.exec_many([ + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + assert_eq!(inserted, 6); + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // std::env::set_var("RUST_LOG", "debug"); + pretty_env_logger::init(); + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build()?; + let db = "tmq"; + + // prepare database + taos.exec_many([ + "DROP TOPIC IF EXISTS tmq_meters".to_string(), + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))".to_string(), + // create topic for subscription + format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") + ]) + .await?; + + let task = tokio::spawn(prepare(taos)); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // subscribe + let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; + + let mut consumer = tmq.build()?; + consumer.subscribe(["tmq_meters"]).await?; + + { + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } + } + consumer.commit(offset).await?; + } + } + + consumer.unsubscribe().await; + + task.await??; + + Ok(()) +} diff --git a/examples/rust/src/main.rs b/examples/rust/src/main.rs new file mode 100644 index 0000000000..e7a11a969c --- /dev/null +++ b/examples/rust/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/examples/rust/wrapper.h b/examples/rust/wrapper.h new file mode 100644 index 0000000000..78857597a9 --- /dev/null +++ b/examples/rust/wrapper.h @@ -0,0 +1 @@ +#include