merge 3.0

This commit is contained in:
Xiaoyu Wang 2022-07-01 13:35:07 +08:00
commit e235c9830d
53 changed files with 1329 additions and 526 deletions

View File

@ -206,8 +206,8 @@ Note: InfluxDB token authorization is not supported at present. Only Basic autho
You can use any client that supports the http protocol to access the RESTful interface address `http://<fqdn>:6041/<APIEndPoint>` to write data in OpenTSDB compatible format to TDengine. You can use any client that supports the http protocol to access the RESTful interface address `http://<fqdn>:6041/<APIEndPoint>` to write data in OpenTSDB compatible format to TDengine.
```text ```text
/opentsdb/v1/put/json/:db /opentsdb/v1/put/json/<db>
/opentsdb/v1/put/telnet/:db /opentsdb/v1/put/telnet/<db>
``` ```
### collectd ### collectd

View File

@ -207,8 +207,8 @@ AllowWebSockets
您可以使用任何支持 http 协议的客户端访问 Restful 接口地址 `http://<fqdn>:6041/<APIEndPoint>` 来写入 OpenTSDB 兼容格式的数据到 TDengine。EndPoint 如下: 您可以使用任何支持 http 协议的客户端访问 Restful 接口地址 `http://<fqdn>:6041/<APIEndPoint>` 来写入 OpenTSDB 兼容格式的数据到 TDengine。EndPoint 如下:
```text ```text
/opentsdb/v1/put/json/:db /opentsdb/v1/put/json/<db>
/opentsdb/v1/put/telnet/:db /opentsdb/v1/put/telnet/<db>
``` ```
### collectd ### collectd

View File

@ -34,6 +34,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_ELAPSED, FUNCTION_TYPE_ELAPSED,
FUNCTION_TYPE_IRATE, FUNCTION_TYPE_IRATE,
FUNCTION_TYPE_LAST_ROW, FUNCTION_TYPE_LAST_ROW,
FUNCTION_TYPE_LAST_ROWT, //TODO: removed
FUNCTION_TYPE_MAX, FUNCTION_TYPE_MAX,
FUNCTION_TYPE_MIN, FUNCTION_TYPE_MIN,
FUNCTION_TYPE_MODE, FUNCTION_TYPE_MODE,

View File

@ -210,6 +210,8 @@ void walCloseRef(SWalRef *);
int32_t walRefVer(SWalRef *, int64_t ver); int32_t walRefVer(SWalRef *, int64_t ver);
int32_t walUnrefVer(SWal *); int32_t walUnrefVer(SWal *);
bool walLogExist(SWal *, int64_t ver);
// lifecycle check // lifecycle check
bool walIsEmpty(SWal *); bool walIsEmpty(SWal *);
int64_t walGetFirstVer(SWal *); int64_t walGetFirstVer(SWal *);

View File

@ -60,6 +60,13 @@ extern "C" {
}) })
#endif #endif
#ifndef __COMPAR_FN_T
#define __COMPAR_FN_T
typedef int32_t (*__compar_fn_t)(const void *, const void *);
#endif
void taosSort(void* arr, int64_t sz, int64_t width, __compar_fn_t compar);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1547,7 +1547,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
} }
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
/*printf("call poll\n");*/ /*tscDebug("call poll");*/
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
@ -1609,8 +1609,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int64_t transporterId = 0; int64_t transporterId = 0;
/*printf("send poll\n");*/ /*printf("send poll\n");*/
char offsetFormatBuf[50]; char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 50, &pVg->currentOffsetNew); tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %s, reqId %lu", tmq->consumerId, tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %s, reqId %lu", tmq->consumerId,
pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId); pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
@ -1708,6 +1708,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} }
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
/*tscDebug("call poll1");*/
void* rspObj; void* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();

View File

@ -892,7 +892,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
int64_t p0 = taosGetTimestampUs(); int64_t p0 = taosGetTimestampUs();
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order); __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
qsort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn); taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
int64_t p1 = taosGetTimestampUs(); int64_t p1 = taosGetTimestampUs();
uDebug("blockDataSort easy cost:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows); uDebug("blockDataSort easy cost:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows);
@ -1218,7 +1218,7 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i);
SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i);
if (pSrc->pData == NULL) { if (pSrc->pData == NULL && (!IS_VAR_DATA_TYPE(pSrc->info.type))) {
continue; continue;
} }

View File

@ -931,9 +931,9 @@ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) {
// sort // sort
if (isJson) { if (isJson) {
qsort(pArray->pData, nTag, sizeof(STagVal), tTagValJsonCmprFn); taosSort(pArray->pData, nTag, sizeof(STagVal), tTagValJsonCmprFn);
} else { } else {
qsort(pArray->pData, nTag, sizeof(STagVal), tTagValCmprFn); taosSort(pArray->pData, nTag, sizeof(STagVal), tTagValCmprFn);
} }
// get size // get size

View File

@ -5402,9 +5402,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
} else if (pVal->type == TMQ_OFFSET__LOG) { } else if (pVal->type == TMQ_OFFSET__LOG) {
snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version); snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
snprintf(buf, maxLen, "offset(snapshot data) uid:%ld, ts:%ld", pVal->uid, pVal->ts); snprintf(buf, maxLen, "offset(ss data) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) { } else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
snprintf(buf, maxLen, "offset(snapshot meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts); snprintf(buf, maxLen, "offset(ss meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
} else { } else {
ASSERT(0); ASSERT(0);
} }

View File

@ -113,7 +113,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
if (!pDataBlocks) { if (!pDataBlocks) {
terrno = TSDB_CODE_TSMA_INVALID_PTR; terrno = TSDB_CODE_TSMA_INVALID_PTR;
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma)); smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma));
return terrno; return TSDB_CODE_FAILED;
} }
if (taosArrayGetSize(pDataBlocks) <= 0) { if (taosArrayGetSize(pDataBlocks) <= 0) {
@ -127,9 +127,9 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
SSmaStat *pStat = NULL; SSmaStat *pStat = NULL;
STSmaStat *pItem = NULL; STSmaStat *pTsmaStat = NULL;
if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) { if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
terrno = TSDB_CODE_TSMA_INVALID_STAT; terrno = TSDB_CODE_TSMA_INVALID_STAT;
@ -137,32 +137,43 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
} }
tdRefSmaStat(pSma, pStat); tdRefSmaStat(pSma, pStat);
pItem = &pStat->tsmaStat; pTsmaStat = SMA_TSMA_STAT(pStat);
ASSERT(pItem); if (!pTsmaStat->pTSma) {
if (!pItem->pTSma) {
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
if (!pTSma) { if (!pTSma) {
terrno = TSDB_CODE_TSMA_NO_INDEX_IN_META; smaError("vgId:%d, failed to get STSma while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
smaWarn("vgId:%d, tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno)); indexUid, tstrerror(terrno));
return TSDB_CODE_FAILED; goto _err;
}
pTsmaStat->pTSma = pTSma;
pTsmaStat->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1);
if (!pTsmaStat->pTSchema) {
smaError("vgId:%d, failed to get STSchema while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
goto _err;
} }
pItem->pTSma = pTSma;
pItem->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1);
ASSERT(pItem->pTSchema); // TODO
} }
ASSERT(pItem->pTSma->indexUid == indexUid); if (pTsmaStat->pTSma->indexUid != indexUid) {
terrno = TSDB_CODE_VND_APP_ERROR;
smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 "(!=%" PRIi64 ") failed since %s", SMA_VID(pSma), indexUid,
pTsmaStat->pTSma->indexUid, tstrerror(terrno));
goto _err;
}
SSubmitReq *pSubmitReq = NULL; SSubmitReq *pSubmitReq = tdBlockToSubmit((const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid,
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId);
pSubmitReq = tdBlockToSubmit((const SArray *)msg, pItem->pTSchema, true, pItem->pTSma->dstTbUid, if (!pSubmitReq) {
pItem->pTSma->dstTbName, pItem->pTSma->dstVgId); smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
goto _err;
}
ASSERT(pSubmitReq); // TODO #if 0
ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14));
ASSERT(!strncasecmp("td.tsma.rst.tb", pItem->pTSma->dstTbName, 14)); #endif
SRpcMsg submitReqMsg = { SRpcMsg submitReqMsg = {
.msgType = TDMT_VND_SUBMIT, .msgType = TDMT_VND_SUBMIT,
@ -170,9 +181,15 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
.contLen = ntohl(pSubmitReq->length), .contLen = ntohl(pSubmitReq->length),
}; };
ASSERT(tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) == 0); if (tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) < 0) {
smaError("vgId:%d, failed to put SubmitReq msg while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
indexUid, tstrerror(terrno));
goto _err;
}
tdUnRefSmaStat(pSma, pStat); tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err:
tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED;
} }

View File

@ -154,10 +154,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
}; };
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
char buf1[50]; char buf1[80];
char buf2[50]; char buf2[80];
tFormatOffset(buf1, 50, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 50, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s", tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
@ -238,8 +238,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal fetchOffsetNew; STqOffsetVal fetchOffsetNew;
// 1.find handle // 1.find handle
char buf[50]; char buf[80];
tFormatOffset(buf, 50, &reqOffset); tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch, tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), buf); TD_VID(pTq->pVnode), buf);
@ -276,7 +276,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf); tqDebug("tmq poll: consumer %ld, offset reset to %s", consumerId, formatBuf);
} else { } else {
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
if (pReq->useSnapshot) { if (pReq->useSnapshot && pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
if (!pHandle->fetchMeta) { if (!pHandle->fetchMeta) {
tqOffsetResetToData(&fetchOffsetNew, 0, 0); tqOffsetResetToData(&fetchOffsetNew, 0, 0);
} else { } else {
@ -360,7 +360,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->reqOffset.version; metaRsp.reqOffset = pReq->reqOffset.version;
/*tqOffsetResetToLog(&metaR)*/
metaRsp.rspOffset = fetchVer; metaRsp.rspOffset = fetchVer;
metaRsp.resMsgType = pHead->msgType; metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRspLen = pHead->bodyLen;
@ -376,22 +375,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosMemoryFree(pHeadWithCkSum); taosMemoryFree(pHeadWithCkSum);
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) { } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_DATA) {
// 1. set uid and ts tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts);
// 2. get data (rebuild reader if needed)
// 3. get new uid and ts
char formatBuf[50];
tFormatOffset(formatBuf, 50, &dataRsp.reqOffset);
tqInfo("retrieve using snapshot req offset %s", formatBuf);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) { if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
ASSERT(0); ASSERT(0);
} }
// 4. send rsp // 4. send rsp
if (dataRsp.blockNum != 0) { if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1;
code = -1;
}
} }
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) { } else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0); ASSERT(0);

View File

@ -267,6 +267,8 @@ int vnodeCommit(SVnode *pVnode) {
return -1; return -1;
} }
pVnode->state.committed = info.state.committed;
// apply the commit (TODO) // apply the commit (TODO)
vnodeBufPoolReset(pVnode->onCommit); vnodeBufPoolReset(pVnode->onCommit);
pVnode->onCommit->next = pVnode->pPool; pVnode->onCommit->next = pVnode->pPool;

View File

@ -118,7 +118,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) { if (order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC) {
__compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc; __compar_fn_t fn = (order == TSDB_ORDER_ASC) ? resultrowComparAsc : resultrowComparDesc;
qsort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), POINTER_BYTES, fn); taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), POINTER_BYTES, fn);
} }
pGroupResInfo->index = 0; pGroupResInfo->index = 0;

View File

@ -1348,7 +1348,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(px->pDataBlock, i);
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
// it is a reserved column for scalar function, and no data in this column yet. // it is a reserved column for scalar function, and no data in this column yet.
if (pDst->pData == NULL) { if (pDst->pData == NULL || pSrc->pData == NULL) {
continue; continue;
} }
@ -2837,12 +2837,10 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) { if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
tsdbSetTableId(pInfo->dataReader, uid); tsdbSetTableId(pInfo->dataReader, uid);
SQueryTableDataCond tmpCond = pInfo->cond; int64_t oldSkey = pInfo->cond.twindows[0].skey;
tmpCond.twindows[0] = (STimeWindow){ pInfo->cond.twindows[0].skey = ts;
.skey = ts, tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
.ekey = INT64_MAX, pInfo->cond.twindows[0].skey = oldSkey;
};
tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0; pInfo->curTWinIdx = 0;
} }

View File

@ -518,7 +518,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if scan table by table // if scan table by table
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
// check status
while (1) { while (1) {
SSDataBlock* result = doTableScanGroup(pOperator); SSDataBlock* result = doTableScanGroup(pOperator);
if (result) { if (result) {
@ -530,7 +529,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
/*pTableInfo->uid */
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;

View File

@ -1246,6 +1246,9 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS
pCtx[i].fpSet.init(&pCtx[i], pResInfo); pCtx[i].fpSet.init(&pCtx[i], pResInfo);
} }
} }
SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId);
setBufPageDirty(bufPage, true);
releaseBufPage(pResultBuf, bufPage);
} }
bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId, bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
@ -3171,6 +3174,10 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex); getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex);
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL); step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL);
ASSERT(isInWindow(pCurWin, tsCols[i], gap)); ASSERT(isInWindow(pCurWin, tsCols[i], gap));
if (pCurWin->pos.pageId == -1) {
// window has been closed.
continue;
}
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput); doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput);
if (result) { if (result) {
taosArrayPush(result, pCurWin); taosArrayPush(result, pCurWin);
@ -3246,12 +3253,12 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput, setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput,
pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo); pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo);
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo);
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pChWin->pos.pageId); SFilePage* bufPage = getBufPage(pChInfo->streamAggSup.pResultBuf, pChWin->pos.pageId);
setBufPageDirty(bufPage, true); releaseBufPage(pChInfo->streamAggSup.pResultBuf, bufPage);
releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage);
continue; continue;
} else if (!pChWin->isClosed) {
break;
} }
break;
} }
} }
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId); SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId);
@ -3265,7 +3272,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn) { int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup,
SArray* pClosed, __get_win_info_ fn, bool delete) {
// Todo(liuyao) save window to tdb // Todo(liuyao) save window to tdb
void** pIte = NULL; void** pIte = NULL;
size_t keyLen = 0; size_t keyLen = 0;
@ -3279,10 +3287,18 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
if (isCloseWindow(&pSeWin->win, pTwSup)) { if (isCloseWindow(&pSeWin->win, pTwSup)) {
if (!pSeWin->isClosed) { if (!pSeWin->isClosed) {
pSeWin->isClosed = true; pSeWin->isClosed = true;
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed); int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pSeWin->isOutput = true; pSeWin->isOutput = true;
} }
if (delete) {
taosArrayRemove(pWins, i);
i--;
size = taosArrayGetSize(pWins);
}
} }
continue; continue;
} }
@ -3292,6 +3308,16 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs, bool delete) {
int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL, getResWinForSession, delete);
}
}
int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ fn) { int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ fn) {
void** pIte = NULL; void** pIte = NULL;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
@ -3339,6 +3365,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "Final Session Recv" : "Single Session Recv");
if (pBlock->info.type == STREAM_CLEAR) { if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
@ -3385,7 +3412,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession); closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForSession, pInfo->ignoreCloseWindow);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
copyUpdateResult(pStUpdated, pUpdated); copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated); taosHashCleanup(pStUpdated);
@ -3437,10 +3466,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) { } else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "Semi Session");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) { if (pBInfo->pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
if (pInfo->pUpdateRes->info.rows == 0) { if (pInfo->pUpdateRes->info.rows == 0) {
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
@ -3449,9 +3479,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
} }
// process the rest of the data // process the rest of the data
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
printDataBlock(pInfo->pUpdateRes, "Semi Session");
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
return pInfo->binfo.pRes; printDataBlock(pBInfo->pRes, "Semi Session");
return pBInfo->pRes;
} }
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
@ -3495,21 +3527,24 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pSup->rowEntryInfoOffset); pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "Semi Session");
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) { if (pBInfo->pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
if (pInfo->pUpdateRes->info.rows == 0) { if (pInfo->pUpdateRes->info.rows == 0) {
return NULL; return NULL;
} }
// process the rest of the data // process the rest of the data
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
printDataBlock(pInfo->pUpdateRes, "Semi Session");
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} }
printDataBlock(pBInfo->pRes, "Semi Session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
} }
@ -3867,7 +3902,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState); closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForState, pInfo->ignoreCloseWindow);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
copyUpdateResult(pSeUpdated, pUpdated); copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated); taosHashCleanup(pSeUpdated);

View File

@ -117,6 +117,9 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getFirstLastInfoSize(int32_t resBytes); int32_t getFirstLastInfoSize(int32_t resBytes);
int32_t lastRowFunction(SqlFunctionCtx *pCtx);
int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);

View File

@ -1954,15 +1954,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "last_row", .name = "last_row",
.type = FUNCTION_TYPE_LAST_ROW, .type = FUNCTION_TYPE_LAST_ROW,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLast, .translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = lastFunction, .processFunc = lastRowFunction,
.finalizeFunc = firstLastFinalize, .finalizeFunc = lastRowFinalize,
.pPartialFunc = "_last_partial",
.pMergeFunc = "_last_merge",
.combineFunc = lastCombine,
}, },
{ {
.name = "_cache_last_row", .name = "_cache_last_row",

View File

@ -80,6 +80,7 @@ typedef struct STopBotRes {
typedef struct SFirstLastRes { typedef struct SFirstLastRes {
bool hasResult; bool hasResult;
bool isNull; //used for last_row function only
int32_t bytes; int32_t bytes;
char buf[]; char buf[];
} SFirstLastRes; } SFirstLastRes;
@ -2764,6 +2765,113 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
int32_t blockDataOrder = (startKey <= endKey) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
if (blockDataOrder == TSDB_ORDER_ASC) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf) < cts) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
pInfo->isNull = true;
} else {
pInfo->isNull = false;
if (IS_VAR_DATA_TYPE(type)) {
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf + sizeof(TSKEY), data, bytes);
}
*(TSKEY*)(pInfo->buf) = cts;
numOfElems++;
//handle selectivity
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
//DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1;
}
break;
}
} else { // descending order
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf) < cts) {
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
pInfo->isNull = true;
} else {
pInfo->isNull = false;
if (IS_VAR_DATA_TYPE(type)) {
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf + sizeof(TSKEY), data, bytes);
}
*(TSKEY*)(pInfo->buf) = cts;
numOfElems++;
//handle selectivity
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
//DO_UPDATE_TAG_COLUMNS(pCtx, ts);
}
break;
}
}
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, pRes->buf + sizeof(TSKEY), pRes->isNull);
//handle selectivity
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
return pResInfo->numOfRes;
}
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SDiffInfo); pEnv->calcMemSize = sizeof(SDiffInfo);
return true; return true;

View File

@ -1887,10 +1887,10 @@ static void top_bottom_func_finalizer(SqlFunctionCtx *pCtx) {
// user specify the order of output by sort the result according to timestamp // user specify the order of output by sort the result according to timestamp
if (pCtx->param[1].param.i == PRIMARYKEY_TIMESTAMP_COL_ID) { if (pCtx->param[1].param.i == PRIMARYKEY_TIMESTAMP_COL_ID) {
__compar_fn_t comparator = (pCtx->param[2].param.i == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn; __compar_fn_t comparator = (pCtx->param[2].param.i == TSDB_ORDER_ASC) ? resAscComparFn : resDescComparFn;
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); taosSort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
} else /*if (pCtx->param[1].param.i > PRIMARYKEY_TIMESTAMP_COL_ID)*/ { } else /*if (pCtx->param[1].param.i > PRIMARYKEY_TIMESTAMP_COL_ID)*/ {
__compar_fn_t comparator = (pCtx->param[2].param.i == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn; __compar_fn_t comparator = (pCtx->param[2].param.i == TSDB_ORDER_ASC) ? resDataAscComparFn : resDataDescComparFn;
qsort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator); taosSort(tvp, (size_t)pResInfo->numOfRes, POINTER_BYTES, comparator);
} }
GET_TRUE_DATA_TYPE(); GET_TRUE_DATA_TYPE();

View File

@ -44,7 +44,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
offset += (int32_t)(pg->num * pMemBucket->bytes); offset += (int32_t)(pg->num * pMemBucket->bytes);
} }
qsort(buffer->data, pMemBucket->pSlots[slotIdx].info.size, pMemBucket->bytes, pMemBucket->comparFn); taosSort(buffer->data, pMemBucket->pSlots[slotIdx].info.size, pMemBucket->bytes, pMemBucket->comparFn);
return buffer; return buffer;
} }

View File

@ -1728,7 +1728,6 @@ static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) {
} else { } else {
pCxt->hasOtherCol = true; pCxt->hasOtherCol = true;
} }
return *((bool*)pContext) ? DEAL_RES_CONTINUE : DEAL_RES_END;
} }
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }

View File

@ -790,11 +790,11 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
pColIdx[i].schemaColIdx = pColList->boundColumns[i]; pColIdx[i].schemaColIdx = pColList->boundColumns[i];
pColIdx[i].boundIdx = i; pColIdx[i].boundIdx = i;
} }
qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar); taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
for (col_id_t i = 0; i < pColList->numOfBound; ++i) { for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].finalIdx = i; pColIdx[i].finalIdx = i;
} }
qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar); taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
} }
if (pColList->numOfCols > pColList->numOfBound) { if (pColList->numOfCols > pColList->numOfBound) {
@ -2232,11 +2232,11 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
pColIdx[i].schemaColIdx = pColList->boundColumns[i]; pColIdx[i].schemaColIdx = pColList->boundColumns[i];
pColIdx[i].boundIdx = i; pColIdx[i].boundIdx = i;
} }
qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar); taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
for (col_id_t i = 0; i < pColList->numOfBound; ++i) { for (col_id_t i = 0; i < pColList->numOfBound; ++i) {
pColIdx[i].finalIdx = i; pColIdx[i].finalIdx = i;
} }
qsort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar); taosSort(pColIdx, pColList->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
} }
if (pColList->numOfCols > pColList->numOfBound) { if (pColList->numOfCols > pColList->numOfBound) {

View File

@ -295,7 +295,7 @@ void sortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf) {
char* pBlockData = pBlocks->data; char* pBlockData = pBlocks->data;
// todo. qsort is unstable, if timestamp is same, should get the last one // todo. qsort is unstable, if timestamp is same, should get the last one
qsort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar); taosSort(pBlockData, pBlocks->numOfRows, dataBuf->rowSize, rowDataCompar);
int32_t i = 0; int32_t i = 0;
int32_t j = 1; int32_t j = 1;
@ -365,7 +365,7 @@ int sortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKey
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
// todo. qsort is unstable, if timestamp is same, should get the last one // todo. qsort is unstable, if timestamp is same, should get the last one
qsort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable); taosSort(pBlkKeyTuple, nRows, sizeof(SBlockKeyTuple), rowDataComparStable);
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
int32_t i = 0; int32_t i = 0;

View File

@ -787,6 +787,13 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
if (pVal->placeholderNo > 0 || pVal->isNull) { if (pVal->placeholderNo > 0 || pVal->isNull) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
if (TSDB_DATA_TYPE_NULL == pVal->node.resType.type) {
// TODO
//pVal->node.resType = targetDt;
pVal->translate = true;
pVal->isNull = true;
return DEAL_RES_CONTINUE;
}
if (pVal->isDuration) { if (pVal->isDuration) {
if (parseNatualDuration(pVal->literal, strlen(pVal->literal), &pVal->datum.i, &pVal->unit, precision) != if (parseNatualDuration(pVal->literal, strlen(pVal->literal), &pVal->datum.i, &pVal->unit, precision) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
@ -5360,7 +5367,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto end; goto end;
} }
} else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) { } else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL && !pVal->isNull) {
char* tmpVal = nodesGetValueFromNode(pVal); char* tmpVal = nodesGetValueFromNode(pVal);
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type}; STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
if (IS_VAR_DATA_TYPE(pTagSchema->type)) { if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
@ -5627,8 +5634,8 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if (DEAL_RES_ERROR == SDataType targetDt = schemaToDataType(pTableMeta->tableInfo.precision, pSchema);
translateValueImpl(pCxt, pStmt->pVal, schemaToDataType(pTableMeta->tableInfo.precision, pSchema))) { if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt)) {
return pCxt->errCode; return pCxt->errCode;
} }
@ -5637,7 +5644,8 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
} }
pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type); pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type);
if (pStmt->pVal->node.resType.type == TSDB_DATA_TYPE_JSON) { if (targetDt.type == TSDB_DATA_TYPE_JSON) {
pReq->isNull = 0;
if (pStmt->pVal->literal && if (pStmt->pVal->literal &&
strlen(pStmt->pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { strlen(pStmt->pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pStmt->pVal->literal); return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pStmt->pVal->literal);

View File

@ -586,11 +586,160 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
return code; return code;
} }
static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) { typedef struct SPartAggCondContext {
// todo SAggLogicNode* pAgg;
bool hasAggFunc;
} SPartAggCondContext;
static EDealRes partAggCondHasAggFuncImpl(SNode* pNode, void* pContext) {
SPartAggCondContext* pCxt = pContext;
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SNode* pAggFunc = NULL;
FOREACH(pAggFunc, pCxt->pAgg->pAggFuncs) {
if (strcmp(((SColumnNode*)pNode)->colName, ((SFunctionNode*)pAggFunc)->node.aliasName) == 0) {
pCxt->hasAggFunc = true;
return DEAL_RES_END;
}
}
}
return DEAL_RES_CONTINUE;
}
static int32_t partitionAggCondHasAggFunc(SAggLogicNode* pAgg, SNode* pCond) {
SPartAggCondContext cxt = {.pAgg = pAgg, .hasAggFunc = false};
nodesWalkExpr(pCond, partAggCondHasAggFuncImpl, &cxt);
return cxt.hasAggFunc;
}
static int32_t partitionAggCondConj(SAggLogicNode* pAgg, SNode** ppAggFuncCond, SNode** ppGroupKeyCond) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pAgg->node.pConditions;
int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pAggFuncConds = NULL;
SNodeList* pGroupKeyConds = NULL;
SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
if (partitionAggCondHasAggFunc(pAgg, pCond)) {
code = nodesListMakeAppend(&pAggFuncConds, nodesCloneNode(pCond));
} else {
code = nodesListMakeAppend(&pGroupKeyConds, nodesCloneNode(pCond));
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
SNode* pTempAggFuncCond = NULL;
SNode* pTempGroupKeyCond = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempAggFuncCond, &pAggFuncConds);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesMergeConds(&pTempGroupKeyCond, &pGroupKeyConds);
}
if (TSDB_CODE_SUCCESS == code) {
*ppAggFuncCond = pTempAggFuncCond;
*ppGroupKeyCond = pTempGroupKeyCond;
} else {
nodesDestroyList(pAggFuncConds);
nodesDestroyList(pGroupKeyConds);
nodesDestroyNode(pTempAggFuncCond);
nodesDestroyNode(pTempGroupKeyCond);
}
pAgg->node.pConditions = NULL;
return code;
}
static int32_t partitionAggCond(SAggLogicNode* pAgg, SNode** ppAggFunCond, SNode** ppGroupKeyCond) {
SNode* pAggNodeCond = pAgg->node.pConditions;
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pAggNodeCond) &&
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pAggNodeCond))->condType) {
return partitionAggCondConj(pAgg, ppAggFunCond, ppGroupKeyCond);
}
if (partitionAggCondHasAggFunc(pAgg, pAggNodeCond)) {
*ppAggFunCond = pAggNodeCond;
} else {
*ppGroupKeyCond = pAggNodeCond;
}
pAgg->node.pConditions = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t pushCondToAggCond(SOptimizeContext* pCxt, SAggLogicNode* pAgg, SNode** pAggFuncCond) {
pushDownCondOptAppendCond(&pAgg->node.pConditions, pAggFuncCond);
return TSDB_CODE_SUCCESS;
}
typedef struct SRewriteAggGroupKeyCondContext{
SAggLogicNode *pAgg;
int32_t errCode;
} SRewriteAggGroupKeyCondContext;
static EDealRes rewriteAggGroupKeyCondForPushDownImpl(SNode** pNode, void* pContext) {
SRewriteAggGroupKeyCondContext* pCxt = pContext;
SAggLogicNode* pAgg = pCxt->pAgg;
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
SNode* pGroupKey = NULL;
FOREACH(pGroupKey, pAgg->pGroupKeys) {
SNode* pGroup = NULL;
FOREACH(pGroup, ((SGroupingSetNode*)pGroupKey)->pParameterList) {
if (0 == strcmp(((SExprNode*)pGroup)->aliasName, ((SColumnNode*)(*pNode))->colName)) {
SNode* pExpr = nodesCloneNode(pGroup);
if (pExpr == NULL) {
pCxt->errCode = terrno;
return DEAL_RES_ERROR;
}
nodesDestroyNode(*pNode);
*pNode = pExpr;
}
}
}
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static int32_t rewriteAggGroupKeyCondForPushDown(SOptimizeContext* pCxt, SAggLogicNode* pAgg, SNode* pGroupKeyCond) {
SRewriteAggGroupKeyCondContext cxt = {.pAgg = pAgg, .errCode = TSDB_CODE_SUCCESS};
nodesRewriteExpr(&pGroupKeyCond, rewriteAggGroupKeyCondForPushDownImpl, &cxt);
return cxt.errCode;
}
static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAgg) {
if (NULL == pAgg->node.pConditions ||
OPTIMIZE_FLAG_TEST_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS;
}
//TODO: remove it after full implementation of pushing down to child
if (1 != LIST_LENGTH(pAgg->node.pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) {
return TSDB_CODE_SUCCESS;
}
SNode* pAggFuncCond = NULL;
SNode* pGroupKeyCond = NULL;
int32_t code = partitionAggCond(pAgg, &pAggFuncCond, &pGroupKeyCond);
if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncCond) {
code = pushCondToAggCond(pCxt, pAgg, &pAggFuncCond);
}
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeyCond) {
code = rewriteAggGroupKeyCondForPushDown(pCxt, pAgg, pGroupKeyCond);
}
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeyCond) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0);
code = pushDownCondOptPushCondToChild(pCxt, pChild, &pGroupKeyCond);
}
if (TSDB_CODE_SUCCESS == code) {
OPTIMIZE_FLAG_SET_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE);
pCxt->optimized = true;
} else {
nodesDestroyNode(pGroupKeyCond);
nodesDestroyNode(pAggFuncCond);
}
return code;
}
static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pLogicNode)) { switch (nodeType(pLogicNode)) {
@ -1707,8 +1856,8 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) { static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0); SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt); nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt);
int32_t code = cxt.errCode; int32_t code = cxt.errCode;

View File

@ -2059,7 +2059,7 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t
} }
if (colIdxi > 1) { if (colIdxi > 1) {
qsort(colIdx, colIdxi, sizeof(uint32_t), getComparFunc(TSDB_DATA_TYPE_USMALLINT, 0)); taosSort(colIdx, colIdxi, sizeof(uint32_t), getComparFunc(TSDB_DATA_TYPE_USMALLINT, 0));
} }
for (uint32_t l = 0; l < colIdxi; ++l) { for (uint32_t l = 0; l < colIdxi; ++l) {
@ -2294,7 +2294,7 @@ int32_t filterMergeGroups(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t *gR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
qsort(gRes, *gResNum, POINTER_BYTES, filterCompareGroupCtx); taosSort(gRes, *gResNum, POINTER_BYTES, filterCompareGroupCtx);
int32_t pEnd = 0, cStart = 0, cEnd = 0; int32_t pEnd = 0, cStart = 0, cEnd = 0;
uint32_t pColNum = 0, cColNum = 0; uint32_t pColNum = 0, cColNum = 0;

View File

@ -287,7 +287,7 @@ int32_t sclInitParamList(SScalarParam **pParams, SNodeList* pParamList, SScalarC
int32_t code = 0; int32_t code = 0;
if (NULL == pParamList) { if (NULL == pParamList) {
if (ctx->pBlockList) { if (ctx->pBlockList) {
SSDataBlock *pBlock = taosArrayGet(ctx->pBlockList, 0); SSDataBlock *pBlock = taosArrayGetP(ctx->pBlockList, 0);
*rowNum = pBlock->info.rows; *rowNum = pBlock->info.rows;
} else { } else {
*rowNum = 1; *rowNum = 1;

View File

@ -607,9 +607,10 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
if (clear) { if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) { if (!uv_is_closing((uv_handle_t*)conn->stream)) {
uv_close((uv_handle_t*)conn->stream, cliDestroy); uv_close((uv_handle_t*)conn->stream, cliDestroy);
} else {
cliDestroy((uv_handle_t*)conn->stream);
} }
//} else {
// cliDestroy((uv_handle_t*)conn->stream);
//}
} }
} }
static void cliDestroy(uv_handle_t* handle) { static void cliDestroy(uv_handle_t* handle) {

View File

@ -139,6 +139,7 @@ static void uvHandleResp(SSvrMsg* msg, SWorkThrd* thrd);
static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd);
static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrd* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister, NULL}; uvHandleRegister, NULL};
static void uvDestroyConn(uv_handle_t* handle); static void uvDestroyConn(uv_handle_t* handle);
// server and worker thread // server and worker thread
@ -775,9 +776,10 @@ static void destroyConn(SSvrConn* conn, bool clear) {
if (!uv_is_closing((uv_handle_t*)conn->pTcp)) { if (!uv_is_closing((uv_handle_t*)conn->pTcp)) {
tTrace("conn %p to be destroyed", conn); tTrace("conn %p to be destroyed", conn);
uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn);
} else {
uvDestroyConn((uv_handle_t*)conn->pTcp);
} }
//} else {
// uvDestroyConn((uv_handle_t*)conn->pTcp);
//}
} }
} }
static void destroyConnRegArg(SSvrConn* conn) { static void destroyConnRegArg(SSvrConn* conn) {

View File

@ -19,6 +19,10 @@
#include "tref.h" #include "tref.h"
#include "walInt.h" #include "walInt.h"
bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
}
bool FORCE_INLINE walIsEmpty(SWal* pWal) { return pWal->vers.firstVer == -1; } bool FORCE_INLINE walIsEmpty(SWal* pWal) { return pWal->vers.firstVer == -1; }
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; } int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }

47
source/os/src/osMath.c Normal file
View File

@ -0,0 +1,47 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "os.h"
#ifdef WINDOWS
void swapStr(char* j, char* J, int width) {
int i;
char tmp;
for (i = 0; i < width; i++) {
tmp = *j;
*j = *J;
*J = tmp;
j++;
J++;
}
}
#endif
void taosSort(void* arr, int64_t sz, int64_t width, __compar_fn_t compar) {
#ifdef WINDOWS
int64_t i, j;
for (i = 0; i < sz - 1; i++) {
for (j = 0; j < sz - 1 - i; j++) {
if (compar((char*)arr + j * width, (char*)arr + (j + 1) * width) > 0.00) {
swapStr((char*)arr + j * width, (char*)arr + (j + 1) * width, width);
}
}
}
#else
qsort(arr, sz, width, compar);
#endif
}

View File

@ -373,7 +373,7 @@ void taosArraySort(SArray* pArray, __compar_fn_t compar) {
assert(pArray != NULL); assert(pArray != NULL);
assert(compar != NULL); assert(compar != NULL);
qsort(pArray->pData, pArray->size, pArray->elemSize, compar); taosSort(pArray->pData, pArray->size, pArray->elemSize, compar);
} }
void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) { void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) {
@ -390,7 +390,7 @@ int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t
void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) { void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) {
assert(pArray != NULL); assert(pArray != NULL);
qsort(pArray->pData, pArray->size, pArray->elemSize, comparFn); taosSort(pArray->pData, pArray->size, pArray->elemSize, comparFn);
} }
char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t comparFn, int32_t flags) { char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t comparFn, int32_t flags) {

View File

@ -124,7 +124,7 @@ void tdigestCompress(TDigest *t) {
t->num_buffered_pts = 0; t->num_buffered_pts = 0;
t->total_weight += unmerged_weight; t->total_weight += unmerged_weight;
qsort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid); taosSort(unmerged_centroids, num_unmerged, sizeof(SCentroid), cmpCentroid);
memset(&args, 0, sizeof(SMergeArgs)); memset(&args, 0, sizeof(SMergeArgs));
args.centroids = (SCentroid*)taosMemoryMalloc((size_t)(sizeof(SCentroid) * t->size)); args.centroids = (SCentroid*)taosMemoryMalloc((size_t)(sizeof(SCentroid) * t->size));
memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size)); memset(args.centroids, 0, (size_t)(sizeof(SCentroid) * t->size));

View File

@ -614,7 +614,7 @@ static int32_t taosCompareTaosError(const void* a, const void* b) {
static TdThreadOnce tsErrorInit = PTHREAD_ONCE_INIT; static TdThreadOnce tsErrorInit = PTHREAD_ONCE_INIT;
static void tsSortError(void) { static void tsSortError(void) {
qsort(errors, sizeof(errors) / sizeof(errors[0]), sizeof(errors[0]), taosCompareTaosError); taosSort(errors, sizeof(errors) / sizeof(errors[0]), sizeof(errors[0]), taosCompareTaosError);
} }
const char* tstrerror(int32_t err) { const char* tstrerror(int32_t err) {

View File

@ -16,24 +16,24 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tlrucache.h" #include "tlrucache.h"
#include "os.h" #include "os.h"
#include "tdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h"
#include "tarray.h" #include "tarray.h"
#include "tdef.h"
#include "tlog.h"
typedef struct SLRUEntry SLRUEntry; typedef struct SLRUEntry SLRUEntry;
typedef struct SLRUEntryTable SLRUEntryTable; typedef struct SLRUEntryTable SLRUEntryTable;
typedef struct SLRUCacheShard SLRUCacheShard; typedef struct SLRUCacheShard SLRUCacheShard;
typedef struct SShardedCache SShardedCache; typedef struct SShardedCache SShardedCache;
enum { enum {
TAOS_LRU_IN_CACHE = (1 << 0), // Whether this entry is referenced by the hash table. TAOS_LRU_IN_CACHE = (1 << 0), // Whether this entry is referenced by the hash table.
TAOS_LRU_IS_HIGH_PRI = (1 << 1), // Whether this entry is high priority entry. TAOS_LRU_IS_HIGH_PRI = (1 << 1), // Whether this entry is high priority entry.
TAOS_LRU_IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry is in high-pri pool. TAOS_LRU_IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry is in high-pri pool.
TAOS_LRU_HAS_HIT = (1 << 3), // Whether this entry has had any lookups (hits). TAOS_LRU_HAS_HIT = (1 << 3), // Whether this entry has had any lookups (hits).
}; };
struct SLRUEntry { struct SLRUEntry {
@ -50,18 +50,39 @@ struct SLRUEntry {
char keyData[1]; char keyData[1];
}; };
#define TAOS_LRU_ENTRY_IN_CACHE(h) ((h)->flags & TAOS_LRU_IN_CACHE) #define TAOS_LRU_ENTRY_IN_CACHE(h) ((h)->flags & TAOS_LRU_IN_CACHE)
#define TAOS_LRU_ENTRY_IN_HIGH_POOL(h) ((h)->flags & TAOS_LRU_IN_HIGH_PRI_POOL) #define TAOS_LRU_ENTRY_IN_HIGH_POOL(h) ((h)->flags & TAOS_LRU_IN_HIGH_PRI_POOL)
#define TAOS_LRU_ENTRY_IS_HIGH_PRI(h) ((h)->flags & TAOS_LRU_IS_HIGH_PRI) #define TAOS_LRU_ENTRY_IS_HIGH_PRI(h) ((h)->flags & TAOS_LRU_IS_HIGH_PRI)
#define TAOS_LRU_ENTRY_HAS_HIT(h) ((h)->flags & TAOS_LRU_HAS_HIT) #define TAOS_LRU_ENTRY_HAS_HIT(h) ((h)->flags & TAOS_LRU_HAS_HIT)
#define TAOS_LRU_ENTRY_SET_IN_CACHE(h, inCache) do { if(inCache) {(h)->flags |= TAOS_LRU_IN_CACHE;} else {(h)->flags &= ~TAOS_LRU_IN_CACHE;} } while(0) #define TAOS_LRU_ENTRY_SET_IN_CACHE(h, inCache) \
#define TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(h, inHigh) do { if(inHigh) {(h)->flags |= TAOS_LRU_IN_HIGH_PRI_POOL;} else {(h)->flags &= ~TAOS_LRU_IN_HIGH_PRI_POOL;} } while(0) do { \
#define TAOS_LRU_ENTRY_SET_PRIORITY(h, priority) do { if(priority == TAOS_LRU_PRIORITY_HIGH) {(h)->flags |= TAOS_LRU_IS_HIGH_PRI;} else {(h)->flags &= ~TAOS_LRU_IS_HIGH_PRI;} } while(0) if (inCache) { \
(h)->flags |= TAOS_LRU_IN_CACHE; \
} else { \
(h)->flags &= ~TAOS_LRU_IN_CACHE; \
} \
} while (0)
#define TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(h, inHigh) \
do { \
if (inHigh) { \
(h)->flags |= TAOS_LRU_IN_HIGH_PRI_POOL; \
} else { \
(h)->flags &= ~TAOS_LRU_IN_HIGH_PRI_POOL; \
} \
} while (0)
#define TAOS_LRU_ENTRY_SET_PRIORITY(h, priority) \
do { \
if (priority == TAOS_LRU_PRIORITY_HIGH) { \
(h)->flags |= TAOS_LRU_IS_HIGH_PRI; \
} else { \
(h)->flags &= ~TAOS_LRU_IS_HIGH_PRI; \
} \
} while (0)
#define TAOS_LRU_ENTRY_SET_HIT(h) ((h)->flags |= TAOS_LRU_HAS_HIT) #define TAOS_LRU_ENTRY_SET_HIT(h) ((h)->flags |= TAOS_LRU_HAS_HIT)
#define TAOS_LRU_ENTRY_HAS_REFS(h) ((h)->refs > 0) #define TAOS_LRU_ENTRY_HAS_REFS(h) ((h)->refs > 0)
#define TAOS_LRU_ENTRY_REF(h) (++(h)->refs) #define TAOS_LRU_ENTRY_REF(h) (++(h)->refs)
static bool taosLRUEntryUnref(SLRUEntry *entry) { static bool taosLRUEntryUnref(SLRUEntry *entry) {
assert(entry->refs > 0); assert(entry->refs > 0);
@ -90,7 +111,7 @@ struct SLRUEntryTable {
static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) { static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) {
table->lengthBits = 4; table->lengthBits = 4;
table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry*)); table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *));
if (!table->list) { if (!table->list) {
return -1; return -1;
} }
@ -125,7 +146,7 @@ static void taosLRUEntryTableCleanup(SLRUEntryTable *table) {
taosMemoryFree(table->list); taosMemoryFree(table->list);
} }
static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) { static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)]; SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)];
while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) { while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) {
entry = &(*entry)->nextHash; entry = &(*entry)->nextHash;
@ -134,7 +155,7 @@ static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable * table, const void *
return entry; return entry;
} }
static void taosLRUEntryTableResize(SLRUEntryTable * table) { static void taosLRUEntryTableResize(SLRUEntryTable *table) {
int lengthBits = table->lengthBits; int lengthBits = table->lengthBits;
if (lengthBits >= table->maxLengthBits) { if (lengthBits >= table->maxLengthBits) {
return; return;
@ -144,9 +165,9 @@ static void taosLRUEntryTableResize(SLRUEntryTable * table) {
return; return;
} }
uint32_t oldLength = 1 << lengthBits; uint32_t oldLength = 1 << lengthBits;
int newLengthBits = lengthBits + 1; int newLengthBits = lengthBits + 1;
SLRUEntry **newList = taosMemoryCalloc(1 << newLengthBits, sizeof(SLRUEntry*)); SLRUEntry **newList = taosMemoryCalloc(1 << newLengthBits, sizeof(SLRUEntry *));
if (!newList) { if (!newList) {
return; return;
} }
@ -154,8 +175,8 @@ static void taosLRUEntryTableResize(SLRUEntryTable * table) {
for (uint32_t i = 0; i < oldLength; ++i) { for (uint32_t i = 0; i < oldLength; ++i) {
SLRUEntry *entry = table->list[i]; SLRUEntry *entry = table->list[i];
while (entry) { while (entry) {
SLRUEntry *next = entry->nextHash; SLRUEntry *next = entry->nextHash;
uint32_t hash = entry->hash; uint32_t hash = entry->hash;
SLRUEntry **ptr = &newList[hash >> (32 - newLengthBits)]; SLRUEntry **ptr = &newList[hash >> (32 - newLengthBits)];
entry->nextHash = *ptr; entry->nextHash = *ptr;
*ptr = entry; *ptr = entry;
@ -170,13 +191,13 @@ static void taosLRUEntryTableResize(SLRUEntryTable * table) {
table->lengthBits = newLengthBits; table->lengthBits = newLengthBits;
} }
static SLRUEntry *taosLRUEntryTableLookup(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) { static SLRUEntry *taosLRUEntryTableLookup(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
return *taosLRUEntryTableFindPtr(table, key, keyLen, hash); return *taosLRUEntryTableFindPtr(table, key, keyLen, hash);
} }
static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable * table, SLRUEntry *entry) { static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable *table, SLRUEntry *entry) {
SLRUEntry **ptr = taosLRUEntryTableFindPtr(table, entry->keyData, entry->keyLength, entry->hash); SLRUEntry **ptr = taosLRUEntryTableFindPtr(table, entry->keyData, entry->keyLength, entry->hash);
SLRUEntry *old = *ptr; SLRUEntry *old = *ptr;
entry->nextHash = (old == NULL) ? NULL : old->nextHash; entry->nextHash = (old == NULL) ? NULL : old->nextHash;
*ptr = entry; *ptr = entry;
if (old == NULL) { if (old == NULL) {
@ -189,9 +210,9 @@ static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable * table, SLRUEntry *ent
return old; return old;
} }
static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) { static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
SLRUEntry **entry = taosLRUEntryTableFindPtr(table, key, keyLen, hash); SLRUEntry **entry = taosLRUEntryTableFindPtr(table, key, keyLen, hash);
SLRUEntry *result = *entry; SLRUEntry *result = *entry;
if (result) { if (result) {
*entry = result->nextHash; *entry = result->nextHash;
--table->elems; --table->elems;
@ -201,17 +222,17 @@ static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable * table, const void *ke
} }
struct SLRUCacheShard { struct SLRUCacheShard {
size_t capacity; size_t capacity;
size_t highPriPoolUsage; size_t highPriPoolUsage;
bool strictCapacity; bool strictCapacity;
double highPriPoolRatio; double highPriPoolRatio;
double highPriPoolCapacity; double highPriPoolCapacity;
SLRUEntry lru; SLRUEntry lru;
SLRUEntry *lruLowPri; SLRUEntry *lruLowPri;
SLRUEntryTable table; SLRUEntryTable table;
size_t usage; // Memory size for entries residing in the cache. size_t usage; // Memory size for entries residing in the cache.
size_t lruUsage; // Memory size for entries residing only in the LRU list. size_t lruUsage; // Memory size for entries residing only in the LRU list.
TdThreadMutex mutex; TdThreadMutex mutex;
}; };
#define TAOS_LRU_CACHE_SHARD_HASH32(key, len) (MurmurHash3_32((key), (len))) #define TAOS_LRU_CACHE_SHARD_HASH32(key, len) (MurmurHash3_32((key), (len)))
@ -231,8 +252,7 @@ static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) {
assert(e->next == NULL); assert(e->next == NULL);
assert(e->prev == NULL); assert(e->prev == NULL);
if (shard->highPriPoolRatio > 0 if (shard->highPriPoolRatio > 0 && (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) {
&& (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) {
e->next = &shard->lru; e->next = &shard->lru;
e->prev = shard->lru.prev; e->prev = shard->lru.prev;
@ -309,8 +329,8 @@ static void taosLRUCacheShardSetCapacity(SLRUCacheShard *shard, size_t capacity)
taosArrayDestroy(lastReferenceList); taosArrayDestroy(lastReferenceList);
} }
static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio,
double highPriPoolRatio, int maxUpperHashBits) { int maxUpperHashBits) {
if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) { if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) {
return -1; return -1;
} }
@ -341,9 +361,10 @@ static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) {
taosLRUEntryTableCleanup(&shard->table); taosLRUEntryTableCleanup(&shard->table);
} }
static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle, bool freeOnFail) { static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle,
bool freeOnFail) {
LRUStatus status = TAOS_LRU_STATUS_OK; LRUStatus status = TAOS_LRU_STATUS_OK;
SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);
taosThreadMutexLock(&shard->mutex); taosThreadMutexLock(&shard->mutex);
@ -355,9 +376,9 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
taosArrayPush(lastReferenceList, &e); taosArrayPush(lastReferenceList, &e);
} else { } else {
if (freeOnFail) { if (freeOnFail) {
taosMemoryFree(e); taosMemoryFree(e);
*handle = NULL; *handle = NULL;
} }
status = TAOS_LRU_STATUS_INCOMPLETE; status = TAOS_LRU_STATUS_INCOMPLETE;
@ -371,21 +392,21 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
assert(TAOS_LRU_ENTRY_IN_CACHE(old)); assert(TAOS_LRU_ENTRY_IN_CACHE(old));
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false); TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
taosLRUCacheShardLRURemove(shard, old); taosLRUCacheShardLRURemove(shard, old);
assert(shard->usage >= old->totalCharge); assert(shard->usage >= old->totalCharge);
shard->usage -= old->totalCharge; shard->usage -= old->totalCharge;
taosArrayPush(lastReferenceList, &old); taosArrayPush(lastReferenceList, &old);
} }
} }
if (handle == NULL) { if (handle == NULL) {
taosLRUCacheShardLRUInsert(shard, e); taosLRUCacheShardLRUInsert(shard, e);
} else { } else {
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) { if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
TAOS_LRU_ENTRY_REF(e); TAOS_LRU_ENTRY_REF(e);
} }
*handle = (LRUHandle*) e; *handle = (LRUHandle *)e;
} }
} }
@ -402,8 +423,8 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
} }
static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash, static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash,
void *value, size_t charge, _taos_lru_deleter_t deleter, void *value, size_t charge, _taos_lru_deleter_t deleter, LRUHandle **handle,
LRUHandle **handle, LRUPriority priority) { LRUPriority priority) {
SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen); SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen);
if (!e) { if (!e) {
return TAOS_LRU_STATUS_FAIL; return TAOS_LRU_STATUS_FAIL;
@ -442,7 +463,7 @@ static LRUHandle *taosLRUCacheShardLookup(SLRUCacheShard *shard, const void *key
taosThreadMutexUnlock(&shard->mutex); taosThreadMutexUnlock(&shard->mutex);
return (LRUHandle *) e; return (LRUHandle *)e;
} }
static void taosLRUCacheShardErase(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash) { static void taosLRUCacheShardErase(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash) {
@ -498,7 +519,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
} }
static bool taosLRUCacheShardRef(SLRUCacheShard *shard, LRUHandle *handle) { static bool taosLRUCacheShardRef(SLRUCacheShard *shard, LRUHandle *handle) {
SLRUEntry *e = (SLRUEntry *) handle; SLRUEntry *e = (SLRUEntry *)handle;
taosThreadMutexLock(&shard->mutex); taosThreadMutexLock(&shard->mutex);
assert(TAOS_LRU_ENTRY_HAS_REFS(e)); assert(TAOS_LRU_ENTRY_HAS_REFS(e));
@ -514,8 +535,8 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b
return false; return false;
} }
SLRUEntry *e = (SLRUEntry *) handle; SLRUEntry *e = (SLRUEntry *)handle;
bool lastReference = false; bool lastReference = false;
taosThreadMutexLock(&shard->mutex); taosThreadMutexLock(&shard->mutex);
@ -579,11 +600,11 @@ static void taosLRUCacheShardSetStrictCapacity(SLRUCacheShard *shard, bool stric
} }
struct SShardedCache { struct SShardedCache {
uint32_t shardMask; uint32_t shardMask;
TdThreadMutex capacityMutex; TdThreadMutex capacityMutex;
size_t capacity; size_t capacity;
bool strictCapacity; bool strictCapacity;
uint64_t lastId; // atomic var for last id uint64_t lastId; // atomic var for last id
}; };
struct SLRUCache { struct SLRUCache {
@ -593,7 +614,7 @@ struct SLRUCache {
}; };
static int getDefaultCacheShardBits(size_t capacity) { static int getDefaultCacheShardBits(size_t capacity) {
int numShardBits = 0; int numShardBits = 0;
size_t minShardSize = 512 * 1024; size_t minShardSize = 512 * 1024;
size_t numShards = capacity / minShardSize; size_t numShards = capacity / minShardSize;
while (numShards >>= 1) { while (numShards >>= 1) {
@ -621,7 +642,7 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
numShardBits = getDefaultCacheShardBits(capacity); numShardBits = getDefaultCacheShardBits(capacity);
} }
int numShards = 1 << numShardBits; int numShards = 1 << numShardBits;
cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard)); cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard));
if (!cache->shards) { if (!cache->shards) {
taosMemoryFree(cache); taosMemoryFree(cache);
@ -629,7 +650,7 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
return NULL; return NULL;
} }
bool strictCapacity = 1; bool strictCapacity = 1;
size_t perShard = (capacity + (numShards - 1)) / numShards; size_t perShard = (capacity + (numShards - 1)) / numShards;
for (int i = 0; i < numShards; ++i) { for (int i = 0; i < numShards; ++i) {
taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits); taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits);
@ -653,7 +674,7 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
int numShards = cache->numShards; int numShards = cache->numShards;
assert(numShards > 0); assert(numShards > 0);
for (int i = 0; i < numShards; ++i) { for (int i = 0; i < numShards; ++i) {
taosLRUCacheShardCleanup(&cache->shards[i]); taosLRUCacheShardCleanup(&cache->shards[i]);
} }
taosMemoryFree(cache->shards); taosMemoryFree(cache->shards);
cache->shards = 0; cache->shards = 0;
@ -666,11 +687,12 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
} }
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge, LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority) { _taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority) {
uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen); uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
uint32_t shardIndex = hash & cache->shardedCache.shardMask; uint32_t shardIndex = hash & cache->shardedCache.shardMask;
return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle, priority); return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle,
priority);
} }
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) { LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) {
@ -699,7 +721,7 @@ bool taosLRUCacheRef(SLRUCache *cache, LRUHandle *handle) {
return false; return false;
} }
uint32_t hash = ((SLRUEntry *) handle)->hash; uint32_t hash = ((SLRUEntry *)handle)->hash;
uint32_t shardIndex = hash & cache->shardedCache.shardMask; uint32_t shardIndex = hash & cache->shardedCache.shardMask;
return taosLRUCacheShardRef(&cache->shards[shardIndex], handle); return taosLRUCacheShardRef(&cache->shards[shardIndex], handle);
@ -710,15 +732,13 @@ bool taosLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRe
return false; return false;
} }
uint32_t hash = ((SLRUEntry *) handle)->hash; uint32_t hash = ((SLRUEntry *)handle)->hash;
uint32_t shardIndex = hash & cache->shardedCache.shardMask; uint32_t shardIndex = hash & cache->shardedCache.shardMask;
return taosLRUCacheShardRelease(&cache->shards[shardIndex], handle, eraseIfLastRef); return taosLRUCacheShardRelease(&cache->shards[shardIndex], handle, eraseIfLastRef);
} }
void* taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle) { void *taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle) { return ((SLRUEntry *)handle)->value; }
return ((SLRUEntry*) handle)->value;
}
size_t taosLRUCacheGetUsage(SLRUCache *cache) { size_t taosLRUCacheGetUsage(SLRUCache *cache) {
size_t usage = 0; size_t usage = 0;
@ -742,7 +762,7 @@ size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) {
void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity) { void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity) {
uint32_t numShards = cache->numShards; uint32_t numShards = cache->numShards;
size_t perShard = (capacity + (numShards = 1)) / numShards; size_t perShard = (capacity + (numShards - 1)) / numShards;
taosThreadMutexLock(&cache->shardedCache.capacityMutex); taosThreadMutexLock(&cache->shardedCache.capacityMutex);

View File

@ -17,6 +17,7 @@ import string
import requests import requests
import time import time
import socket import socket
import json
from .boundary import DataBoundary from .boundary import DataBoundary
import taos import taos
from util.log import * from util.log import *
@ -534,6 +535,7 @@ class TDCom:
ts_value = self.genTs()[0] ts_value = self.genTs()[0]
column_value_list = list() column_value_list = list()
column_value_list.append(ts_value)
if column_elm_list is None: if column_elm_list is None:
column_value_list = list(map(lambda i: self.gen_random_type_value(i, self.default_varchar_length, self.default_varchar_datatype, self.default_nchar_length, self.default_nchar_datatype), self.full_type_list)) column_value_list = list(map(lambda i: self.gen_random_type_value(i, self.default_varchar_length, self.default_varchar_datatype, self.default_nchar_length, self.default_nchar_datatype), self.full_type_list))
else: else:
@ -550,7 +552,7 @@ class TDCom:
column_value_list.append(self.gen_random_type_value(column_elm["type"], "", "", "", "")) column_value_list.append(self.gen_random_type_value(column_elm["type"], "", "", "", ""))
else: else:
continue continue
column_value_list = [self.ts_value] + self.column_value_list # column_value_list = [self.ts_value] + self.column_value_list
return column_value_list return column_value_list
def create_stable(self, tsql, dbname=None, stbname="stb", column_elm_list=None, tag_elm_list=None, def create_stable(self, tsql, dbname=None, stbname="stb", column_elm_list=None, tag_elm_list=None,
@ -639,7 +641,16 @@ class TDCom:
else: else:
for num in range(count): for num in range(count):
column_value_list = self.gen_column_value_list(column_ele_list, f'{start_ts_value}+{num}s') column_value_list = self.gen_column_value_list(column_ele_list, f'{start_ts_value}+{num}s')
column_value_str = ", ".join(str(v) for v in column_value_list) # column_value_str = ", ".join(str(v) for v in column_value_list)
column_value_str = ''
idx = 0
for column_value in column_value_list:
if isinstance(column_value, str) and idx != 0:
column_value_str += f'"{column_value}", '
else:
column_value_str += f'{column_value}, '
idx += 1
column_value_str = column_value_str.rstrip()[:-1]
insert_sql = f'insert into {dbname}.{tbname} values ({column_value_str});' insert_sql = f'insert into {dbname}.{tbname} values ({column_value_str});'
tsql.execute(insert_sql) tsql.execute(insert_sql)
def getOneRow(self, location, containElm): def getOneRow(self, location, containElm):
@ -651,4 +662,16 @@ class TDCom:
return res_list return res_list
else: else:
tdLog.exit(f"getOneRow out of range: row_index={location} row_count={self.query_row}") tdLog.exit(f"getOneRow out of range: row_index={location} row_count={self.query_row}")
def is_json(msg):
if isinstance(msg, str):
try:
json.loads(msg)
return True
except:
return False
else:
return False
tdCom = TDCom() tdCom = TDCom()

View File

@ -71,6 +71,34 @@ TAOS_KEYWORDS = [
"COPY", "IF", "NOW", "STABLES", "WHERE", "COPY", "IF", "NOW", "STABLES", "WHERE",
] ]
NUM_FUNC = [
"ABS", "ACOS", "ASIN", "ATAN", "CEIL", "COS", "FLOOR", "LOG", "POW", "ROUND", "SIN", "SQRT", "TAN",
]
STR_FUNC = [
"CHAR_LENGTH", "CONCAT", "CONCAT_WS", "LENGTH", "LOWER","LTRIM", "RTRIM", "SUBSTR", "UPPER",
]
CONVER_FUNC = ["CASR", "TO_ISO8601", "TO_JSON", "TP_UNIXTIMESTAMP"]
SELECT_FUNC = [
"APERCENTILE", "BOTTOM", "FIRST", "INTERP", "LAST", "MAX", "MIN", "PERCENTILE", "TAIL", "TOP", "UNIQUE",
]
AGG_FUNC = [
"AVG", "COUNT", "ELAPSED", "LEASTSQUARES", "MODE", "SPREAD", "STDDEV", "SUM", "HYPERLOGLOG", "HISTOGRAM",
]
TS_FUNC = [
"CSUM", "DERIVATIVE", "DIFF", "IRATE", "MAVG", "SAMPLE", "STATECOUNT", "STATEDURATION", "TWA"
]
SYSINFO_FUCN = [
"DATABASE", "CLIENT_VERSION", "SERVER_VERSION", "SERVER_STATUS", "CURRENT_USER", "USER"
]
# basic data type boundary # basic data type boundary
TINYINT_MAX = 127 TINYINT_MAX = 127
TINYINT_MIN = -128 TINYINT_MIN = -128
@ -84,16 +112,16 @@ SMALLINT_MIN = -32768
SMALLINT_UN_MAX = 65535 SMALLINT_UN_MAX = 65535
SMALLINT_UN_MIN = 0 SMALLINT_UN_MIN = 0
INT_MAX = 2147483647 INT_MAX = 2_147_483_647
INT_MIN = -2147483648 INT_MIN = -2_147_483_648
INT_UN_MAX = 4294967295 INT_UN_MAX = 4_294_967_295
INT_UN_MIN = 0 INT_UN_MIN = 0
BIGINT_MAX = 9223372036854775807 BIGINT_MAX = 9_223_372_036_854_775_807
BIGINT_MIN = -9223372036854775808 BIGINT_MIN = -9_223_372_036_854_775_808
BIGINT_UN_MAX = 18446744073709551615 BIGINT_UN_MAX = 18_446_744_073_709_551_615
BIGINT_UN_MIN = 0 BIGINT_UN_MIN = 0
FLOAT_MAX = 3.40E+38 FLOAT_MAX = 3.40E+38
@ -131,13 +159,13 @@ COL_COUNT_MIN = 2
TAG_COL_COUNT_MAX = 4096 TAG_COL_COUNT_MAX = 4096
TAG_COL_COUNT_MIN = 3 TAG_COL_COUNT_MIN = 3
MNODE_SHM_SIZE_MAX = 2147483647 MNODE_SHM_SIZE_MAX = 2_147_483_647
MNODE_SHM_SIZE_MIN = 6292480 MNODE_SHM_SIZE_MIN = 6_292_480
MNODE_SHM_SIZE_DEFAULT = 6292480 MNODE_SHM_SIZE_DEFAULT = 6_292_480
VNODE_SHM_SIZE_MAX = 2147483647 VNODE_SHM_SIZE_MAX = 2_147_483_647
VNODE_SHM_SIZE_MIN = 6292480 VNODE_SHM_SIZE_MIN = 6_292_480
VNODE_SHM_SIZE_DEFAULT = 31458304 VNODE_SHM_SIZE_DEFAULT = 31_458_304
# time_init # time_init
TIME_MS = 1 TIME_MS = 1
@ -160,6 +188,7 @@ INTERVAL_MIN = 1 * TIME_MS if PRECISION == PRECISION_DEFAULT else 1 * TIME_US
# streams and related agg-function # streams and related agg-function
SMA_INDEX_FUNCTIONS = ["MIN", "MAX"] SMA_INDEX_FUNCTIONS = ["MIN", "MAX"]
ROLLUP_FUNCTIONS = ["AVG", "SUM", "MIN", "MAX", "LAST", "FIRST"] ROLLUP_FUNCTIONS = ["AVG", "SUM", "MIN", "MAX", "LAST", "FIRST"]
BLOCK_FUNCTIONS = ["SUM", "MIN", "MAX"]
SMA_WATMARK_MAXDELAY_INIT = ['a', "s", "m"] SMA_WATMARK_MAXDELAY_INIT = ['a', "s", "m"]
WATERMARK_MAX = 900000 WATERMARK_MAX = 900000
WATERMARK_MIN = 0 WATERMARK_MIN = 0

View File

@ -96,6 +96,15 @@ class TDSql:
return self.queryResult return self.queryResult
return self.queryRows return self.queryRows
def is_err_sql(self, sql):
err_flag = True
try:
self.cursor.execute(sql)
except BaseException:
err_flag = False
return False if err_flag else True
def getVariable(self, search_attr): def getVariable(self, search_attr):
''' '''
get variable of search_attr access "show variables" get variable of search_attr access "show variables"
@ -249,7 +258,6 @@ class TDSql:
raise Exception(repr(e)) raise Exception(repr(e))
return self.queryResult return self.queryResult
def executeTimes(self, sql, times): def executeTimes(self, sql, times):
for i in range(times): for i in range(times):
try: try:
@ -336,6 +344,38 @@ class TDSql:
elif precision == "ns": elif precision == "ns":
return int(times*1000*1000) return int(times*1000*1000)
def get_type(self, col):
if self.cursor.istype(col, "BOOL"):
return "BOOL"
if self.cursor.istype(col, "INT"):
return "INT"
if self.cursor.istype(col, "BIGINT"):
return "BIGINT"
if self.cursor.istype(col, "TINYINT"):
return "TINYINT"
if self.cursor.istype(col, "SMALLINT"):
return "SMALLINT"
if self.cursor.istype(col, "FLOAT"):
return "FLOAT"
if self.cursor.istype(col, "DOUBLE"):
return "DOUBLE"
if self.cursor.istype(col, "BINARY"):
return "BINARY"
if self.cursor.istype(col, "NCHAR"):
return "NCHAR"
if self.cursor.istype(col, "TIMESTAMP"):
return "TIMESTAMP"
if self.cursor.istype(col, "JSON"):
return "JSON"
if self.cursor.istype(col, "TINYINT UNSIGNED"):
return "TINYINT UNSIGNED"
if self.cursor.istype(col, "SMALLINT UNSIGNED"):
return "SMALLINT UNSIGNED"
if self.cursor.istype(col, "INT UNSIGNED"):
return "INT UNSIGNED"
if self.cursor.istype(col, "BIGINT UNSIGNED"):
return "BIGINT UNSIGNED"
def taosdStatus(self, state): def taosdStatus(self, state):
tdLog.sleep(5) tdLog.sleep(5)
pstate = 0 pstate = 0

View File

@ -111,7 +111,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb $totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = 1 $expectmsgcnt = 1000000
$expectrowcnt = 100 $expectrowcnt = 100
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
@ -131,9 +131,6 @@ endi
if $data[0][1] != $consumerId then if $data[0][1] != $consumerId then
return -1 return -1
endi endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectrowcnt then if $data[0][3] != $expectrowcnt then
return -1 return -1
endi endi
@ -183,7 +180,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfCtb = $rowsPerCtb $totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = 1 $expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb print == start consumer to pull msgs from ctb
@ -254,7 +251,7 @@ endi
$consumerId = 0 $consumerId = 0
$totalMsgOfNtb = $rowsPerCtb $totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb print == start consumer to pull msgs from ntb

View File

@ -80,7 +80,7 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb $totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum $totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfStb $expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
@ -168,7 +168,7 @@ $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum $totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfCtb $expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ctb_function $topicList = ' . topic_ctb_function
@ -245,7 +245,7 @@ $topicList = $topicList . '
$consumerId = 0 $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb $totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum $totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfNtb $expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit ) sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )

View File

@ -313,9 +313,11 @@ class TDTestCase:
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.query("select udf1(num1) , bottom(num1,1) from tb;") tdSql.query("select udf1(num1) , bottom(num1,1) from tb;")
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.error("select udf1(num1) , last_row(num1) from tb;") tdSql.query("select udf1(num1) , last_row(num1) from tb;")
tdSql.checkRows(1)
tdSql.error("select round(num1) , last_row(num1) from tb;") tdSql.query("select round(num1) , last_row(num1) from tb;")
tdSql.checkRows(1)
# stable # stable
@ -340,8 +342,10 @@ class TDTestCase:
tdSql.query("select ceil(c1) , bottom(c1,1) from stb1;") tdSql.query("select ceil(c1) , bottom(c1,1) from stb1;")
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.error("select udf1(c1) , last_row(c1) from stb1;") tdSql.query("select udf1(c1) , last_row(c1) from stb1;")
tdSql.error("select ceil(c1) , last_row(c1) from stb1;") tdSql.checkRows(1)
tdSql.query("select ceil(c1) , last_row(c1) from stb1;")
tdSql.checkRows(1)
# regular table with compute functions # regular table with compute functions

View File

@ -315,9 +315,11 @@ class TDTestCase:
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.query("select udf1(num1) , bottom(num1,1) from tb;") tdSql.query("select udf1(num1) , bottom(num1,1) from tb;")
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.error("select udf1(num1) , last_row(num1) from tb;") tdSql.query("select udf1(num1) , last_row(num1) from tb;")
tdSql.checkRows(1)
tdSql.error("select round(num1) , last_row(num1) from tb;") tdSql.query("select round(num1) , last_row(num1) from tb;")
tdSql.checkRows(1)
# stable # stable
@ -342,8 +344,10 @@ class TDTestCase:
tdSql.query("select ceil(c1) , bottom(c1,1) from stb1;") tdSql.query("select ceil(c1) , bottom(c1,1) from stb1;")
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.error("select udf1(c1) , last_row(c1) from stb1;") tdSql.query("select udf1(c1) , last_row(c1) from stb1;")
tdSql.error("select ceil(c1) , last_row(c1) from stb1;") tdSql.checkRows(1)
tdSql.query("select ceil(c1) , last_row(c1) from stb1;")
tdSql.checkRows(1)
# regular table with compute functions # regular table with compute functions

View File

@ -312,9 +312,11 @@ class TDTestCase:
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.query("select udf1(num1) , bottom(num1,1) from tb;") tdSql.query("select udf1(num1) , bottom(num1,1) from tb;")
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.error("select udf1(num1) , last_row(num1) from tb;") tdSql.query("select udf1(num1) , last_row(num1) from tb;")
tdSql.checkRows(1)
tdSql.error("select round(num1) , last_row(num1) from tb;") tdSql.query("select round(num1) , last_row(num1) from tb;")
tdSql.checkRows(1)
# stable # stable
@ -339,8 +341,10 @@ class TDTestCase:
tdSql.query("select ceil(c1) , bottom(c1,1) from stb1;") tdSql.query("select ceil(c1) , bottom(c1,1) from stb1;")
tdSql.checkRows(1) tdSql.checkRows(1)
tdSql.error("select udf1(c1) , last_row(c1) from stb1;") tdSql.query("select udf1(c1) , last_row(c1) from stb1;")
tdSql.error("select ceil(c1) , last_row(c1) from stb1;") tdSql.checkRows(1)
tdSql.query("select ceil(c1) , last_row(c1) from stb1;")
tdSql.checkRows(1)
# regular table with compute functions # regular table with compute functions

View File

@ -163,38 +163,6 @@ class TDTestCase:
# return filter(None, sqls) # return filter(None, sqls)
return list(filter(None, sqls)) return list(filter(None, sqls))
def __get_type(self, col):
if tdSql.cursor.istype(col, "BOOL"):
return "BOOL"
if tdSql.cursor.istype(col, "INT"):
return "INT"
if tdSql.cursor.istype(col, "BIGINT"):
return "BIGINT"
if tdSql.cursor.istype(col, "TINYINT"):
return "TINYINT"
if tdSql.cursor.istype(col, "SMALLINT"):
return "SMALLINT"
if tdSql.cursor.istype(col, "FLOAT"):
return "FLOAT"
if tdSql.cursor.istype(col, "DOUBLE"):
return "DOUBLE"
if tdSql.cursor.istype(col, "BINARY"):
return "BINARY"
if tdSql.cursor.istype(col, "NCHAR"):
return "NCHAR"
if tdSql.cursor.istype(col, "TIMESTAMP"):
return "TIMESTAMP"
if tdSql.cursor.istype(col, "JSON"):
return "JSON"
if tdSql.cursor.istype(col, "TINYINT UNSIGNED"):
return "TINYINT UNSIGNED"
if tdSql.cursor.istype(col, "SMALLINT UNSIGNED"):
return "SMALLINT UNSIGNED"
if tdSql.cursor.istype(col, "INT UNSIGNED"):
return "INT UNSIGNED"
if tdSql.cursor.istype(col, "BIGINT UNSIGNED"):
return "BIGINT UNSIGNED"
def explain_check(self): def explain_check(self):
sqls = self.sql_list() sqls = self.sql_list()
tdLog.printNoPrefix("===step 1: curent case, must return query OK") tdLog.printNoPrefix("===step 1: curent case, must return query OK")

View File

@ -116,37 +116,6 @@ class TDTestCase:
# return filter(None, sqls) # return filter(None, sqls)
return list(filter(None, sqls)) return list(filter(None, sqls))
def __get_type(self, col):
if tdSql.cursor.istype(col, "BOOL"):
return "BOOL"
if tdSql.cursor.istype(col, "INT"):
return "INT"
if tdSql.cursor.istype(col, "BIGINT"):
return "BIGINT"
if tdSql.cursor.istype(col, "TINYINT"):
return "TINYINT"
if tdSql.cursor.istype(col, "SMALLINT"):
return "SMALLINT"
if tdSql.cursor.istype(col, "FLOAT"):
return "FLOAT"
if tdSql.cursor.istype(col, "DOUBLE"):
return "DOUBLE"
if tdSql.cursor.istype(col, "BINARY"):
return "BINARY"
if tdSql.cursor.istype(col, "NCHAR"):
return "NCHAR"
if tdSql.cursor.istype(col, "TIMESTAMP"):
return "TIMESTAMP"
if tdSql.cursor.istype(col, "JSON"):
return "JSON"
if tdSql.cursor.istype(col, "TINYINT UNSIGNED"):
return "TINYINT UNSIGNED"
if tdSql.cursor.istype(col, "SMALLINT UNSIGNED"):
return "SMALLINT UNSIGNED"
if tdSql.cursor.istype(col, "INT UNSIGNED"):
return "INT UNSIGNED"
if tdSql.cursor.istype(col, "BIGINT UNSIGNED"):
return "BIGINT UNSIGNED"
def hyperloglog_check(self): def hyperloglog_check(self):
sqls = self.sql_list() sqls = self.sql_list()

View File

@ -195,38 +195,6 @@ class TDTestCase:
# return filter(None, sqls) # return filter(None, sqls)
return list(filter(None, current_sqls)), list(filter(None, err_sqls)) return list(filter(None, current_sqls)), list(filter(None, err_sqls))
def __get_type(self, col):
if tdSql.cursor.istype(col, "BOOL"):
return "BOOL"
if tdSql.cursor.istype(col, "INT"):
return "INT"
if tdSql.cursor.istype(col, "BIGINT"):
return "BIGINT"
if tdSql.cursor.istype(col, "TINYINT"):
return "TINYINT"
if tdSql.cursor.istype(col, "SMALLINT"):
return "SMALLINT"
if tdSql.cursor.istype(col, "FLOAT"):
return "FLOAT"
if tdSql.cursor.istype(col, "DOUBLE"):
return "DOUBLE"
if tdSql.cursor.istype(col, "BINARY"):
return "BINARY"
if tdSql.cursor.istype(col, "NCHAR"):
return "NCHAR"
if tdSql.cursor.istype(col, "TIMESTAMP"):
return "TIMESTAMP"
if tdSql.cursor.istype(col, "JSON"):
return "JSON"
if tdSql.cursor.istype(col, "TINYINT UNSIGNED"):
return "TINYINT UNSIGNED"
if tdSql.cursor.istype(col, "SMALLINT UNSIGNED"):
return "SMALLINT UNSIGNED"
if tdSql.cursor.istype(col, "INT UNSIGNED"):
return "INT UNSIGNED"
if tdSql.cursor.istype(col, "BIGINT UNSIGNED"):
return "BIGINT UNSIGNED"
def leastsquares_check(self): def leastsquares_check(self):
current_sqls, err_sqls = self.sql_list() current_sqls, err_sqls = self.sql_list()
for i in range(len(err_sqls)): for i in range(len(err_sqls)):

View File

@ -159,38 +159,6 @@ class TDTestCase:
# return filter(None, sqls) # return filter(None, sqls)
return list(filter(None, sqls)) return list(filter(None, sqls))
def __get_type(self, col):
if tdSql.cursor.istype(col, "BOOL"):
return "BOOL"
if tdSql.cursor.istype(col, "INT"):
return "INT"
if tdSql.cursor.istype(col, "BIGINT"):
return "BIGINT"
if tdSql.cursor.istype(col, "TINYINT"):
return "TINYINT"
if tdSql.cursor.istype(col, "SMALLINT"):
return "SMALLINT"
if tdSql.cursor.istype(col, "FLOAT"):
return "FLOAT"
if tdSql.cursor.istype(col, "DOUBLE"):
return "DOUBLE"
if tdSql.cursor.istype(col, "BINARY"):
return "BINARY"
if tdSql.cursor.istype(col, "NCHAR"):
return "NCHAR"
if tdSql.cursor.istype(col, "TIMESTAMP"):
return "TIMESTAMP"
if tdSql.cursor.istype(col, "JSON"):
return "JSON"
if tdSql.cursor.istype(col, "TINYINT UNSIGNED"):
return "TINYINT UNSIGNED"
if tdSql.cursor.istype(col, "SMALLINT UNSIGNED"):
return "SMALLINT UNSIGNED"
if tdSql.cursor.istype(col, "INT UNSIGNED"):
return "INT UNSIGNED"
if tdSql.cursor.istype(col, "BIGINT UNSIGNED"):
return "BIGINT UNSIGNED"
def spread_check(self): def spread_check(self):
sqls = self.sql_list() sqls = self.sql_list()
tdLog.printNoPrefix("===step 1: curent case, must return query OK") tdLog.printNoPrefix("===step 1: curent case, must return query OK")

View File

@ -0,0 +1,259 @@
from ntpath import join
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.cluster import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.dnodes = 5
self.mnodes = 3
self.idIndex = 0
self.roleIndex = 2
self.mnodeStatusIndex = 3
self.mnodeEpIndex = 1
self.dnodeStatusIndex = 4
self.mnodeCheckCnt = 10
self.host = socket.gethostname()
self.startPort = 6030
self.portStep = 100
self.dnodeOfLeader = 0
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: topic: select * from stb, while consume, add column int-A/bianry-B/float-C, and then modify B, drop C")
tdLog.printNoPrefix("add tag int-A/bianry-B/float-C, and then rename A, modify B, drop C, set t2")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}, {'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
topicNameList = ['topic1']
expectRowsList = []
queryStringList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# tdLog.info("async insert data")
# pThread = tmqCom.asyncInsertData(paraDict)
tmqCom.insert_data_2(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"],paraDict["ctbStartIdx"])
tdLog.info("create topics from stb with filter")
queryStringList.append("select * from %s.%s" %(paraDict['dbName'], paraDict['stbName']))
sqlString = "create topic %s as %s" %(topicNameList[0], queryStringList[0])
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryStringList[0])
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
dstFile = tmqCom.getResultFileByTaosShell(consumerId, queryStringList[0])
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the notify info of start consume, then alter schema")
tmqCom.getStartConsumeNotifyFromTmqsim()
# add column double-A/bianry-B/double-C, and then modify B, drop C
sqlString = "alter table %s.%s add column newc1 double"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s add column newc2 binary(16)"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s add column newc3 double"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s modify column newc2 binary(32)"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s drop column newc3"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
# add tag double-A/bianry-B/double-C, and then rename A, modify B, drop C, set t1
sqlString = "alter table %s.%s add tag newt1 double"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s add tag newt2 binary(16)"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s add tag newt3 double"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s rename tag newt1 newt1n"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s modify tag newt2 binary(32)"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s drop tag newt3"%(paraDict["dbName"],paraDict['stbName'])
tdSql.execute(sqlString)
sqlString = "alter table %s.%s0 set tag newt2='new tag'"%(paraDict["dbName"],paraDict['ctbPrefix'])
tdSql.execute(sqlString)
tdLog.info("check the consume result")
tdSql.query(queryStringList[0])
expectRowsList.append(tdSql.getRows())
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
tdLog.info("expect consume rows: %d"%(expectRowsList[0]))
tdLog.info("act consume rows: %d"%(resultList[0]))
if expectRowsList[0] != resultList[0]:
tdLog.exit("0 tmq consume rows error!")
tmqCom.checkTmqConsumeFileContent(consumerId, dstFile)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: topic: select * from ntb, while consume, add column int-A/bianry-B/float-C, and then rename A, modify B, drop C")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':2}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1}
ntbName = 'ntb'
topicNameList = ['topic1']
expectRowsList = []
queryStringList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ntb")
tdCom.create_table(tdSql, dbname=paraDict["dbName"],tbname=ntbName,column_elm_list=paraDict['colSchema'],count=1)
tdLog.info("start insert data ....")
# pThread = tmqCom.asyncInsertData(paraDict)
tdCom.insert_rows(tdSql, dbname=paraDict["dbName"], tbname=ntbName, column_ele_list=paraDict['colSchema'], start_ts_value=paraDict["startTs"], count=paraDict["rowsPerTbl"])
tdLog.info("insert data end")
tdLog.info("create topics from ntb with filter")
queryStringList.append("select * from %s.%s" %(paraDict['dbName'], ntbName))
sqlString = "create topic %s as %s" %(topicNameList[0], queryStringList[0])
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryStringList[0])
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
dstFile = tmqCom.getResultFileByTaosShell(consumerId, queryStringList[0])
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
tdLog.info("wait the notify info of start consume, then alter schema")
tmqCom.getStartConsumeNotifyFromTmqsim()
# add column double-A/bianry-B/double-C, and then rename A, modify B, drop C
sqlString = "alter table %s.%s add column newc1 double"%(paraDict["dbName"],ntbName)
tdSql.execute(sqlString)
sqlString = "alter table %s.%s add column newc2 binary(16)"%(paraDict["dbName"],ntbName)
tdSql.execute(sqlString)
sqlString = "alter table %s.%s add column newc3 double"%(paraDict["dbName"],ntbName)
tdSql.execute(sqlString)
sqlString = "alter table %s.%s rename column newc1 newc1n"%(paraDict["dbName"],ntbName)
tdSql.execute(sqlString)
sqlString = "alter table %s.%s modify column newc2 binary(32)"%(paraDict["dbName"],ntbName)
tdSql.execute(sqlString)
sqlString = "alter table %s.%s drop column newc3"%(paraDict["dbName"],ntbName)
tdSql.execute(sqlString)
tdLog.info("check the consume result")
tdSql.query(queryStringList[0])
expectRowsList.append(tdSql.getRows())
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
tdLog.info("expect consume rows: %d"%(expectRowsList[0]))
tdLog.info("act consume rows: %d"%(resultList[0]))
if expectRowsList[0] != resultList[0]:
tdLog.exit("0 tmq consume rows error!")
tmqCom.checkTmqConsumeFileContent(consumerId, dstFile)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -93,7 +93,7 @@ class TMQCom:
return resultList return resultList
def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0): def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0,snapshot=0):
buildPath = tdCom.getBuildPath() buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath() cfgPath = tdCom.getClientCfgPath()
if valgrind == 1: if valgrind == 1:
@ -109,7 +109,7 @@ class TMQCom:
os.system(shellCmd) os.system(shellCmd)
processorName = processorNameNew processorName = processorNameNew
shellCmd = 'mintty -h never ' + processorName + ' -c ' + cfgPath shellCmd = 'mintty -h never ' + processorName + ' -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot)
shellCmd += "> nul 2>&1 &" shellCmd += "> nul 2>&1 &"
else: else:
processorName = buildPath + '/build/bin/tmq_sim' processorName = buildPath + '/build/bin/tmq_sim'
@ -119,7 +119,7 @@ class TMQCom:
os.system(shellCmd) os.system(shellCmd)
processorName = processorNameNew processorName = processorNameNew
shellCmd = 'nohup ' + processorName + ' -c ' + cfgPath shellCmd = 'nohup ' + processorName + ' -c ' + cfgPath
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) shellCmd += " -y %d -d %s -g %d -r %d -w %s -e %d "%(pollDelay, dbName, showMsg, showRow, cdbName, snapshot)
shellCmd += "> /dev/null 2>&1 &" shellCmd += "> /dev/null 2>&1 &"
tdLog.info(shellCmd) tdLog.info(shellCmd)
os.system(shellCmd) os.system(shellCmd)
@ -383,6 +383,69 @@ class TMQCom:
pThread.start() pThread.start()
return pThread return pThread
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
lines = 0
while True:
dst = queryFile.readline()
src = consumeFile.readline()
lines += 1
if dst:
if dst != src:
tdLog.info("src row: %s"%src)
tdLog.info("dst row: %s"%dst)
tdLog.exit("consumerId %d consume rows[%d] is not match the rows by direct query"%(consumerId, lines))
else:
break
return
def getResultFileByTaosShell(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
return dstFile
def checkTmqConsumeFileContent(self, consumerId, dstFile):
cfgPath = tdCom.getClientCfgPath()
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
lines = 0
while True:
dst = queryFile.readline()
src = consumeFile.readline()
lines += 1
if dst:
if dst != src:
tdLog.info("src row: %s"%src)
tdLog.info("dst row: %s"%dst)
tdLog.exit("consumerId %d consume rows[%d] is not match the rows by direct query"%(consumerId, lines))
else:
break
return
def close(self): def close(self):
self.cursor.close() self.cursor.close()

View File

@ -0,0 +1,107 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
topicNameList = ['topic1']
expectRowsList = []
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdDnodes.stop(1)
time.sleep(2)
tdDnodes.start(1)
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
# sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.tmqCase1()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -153,3 +153,4 @@ python3 ./test.py -f 7-tmq/tmqUdf.py
#python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5 #python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5
python3 ./test.py -f 7-tmq/tmqConsumerGroup.py python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
python3 ./test.py -f 7-tmq/tmqShow.py python3 ./test.py -f 7-tmq/tmqShow.py
python3 ./test.py -f 7-tmq/tmqAlterSchema.py

View File

@ -36,7 +36,11 @@
#define MAX_CONSUMER_THREAD_CNT (16) #define MAX_CONSUMER_THREAD_CNT (16)
#define MAX_VGROUP_CNT (32) #define MAX_VGROUP_CNT (32)
typedef enum { NOTIFY_CMD_START_CONSUM, NOTIFY_CMD_START_COMMIT, NOTIFY_CMD_ID_BUTT } NOTIFY_CMD_ID; typedef enum {
NOTIFY_CMD_START_CONSUM,
NOTIFY_CMD_START_COMMIT,
NOTIFY_CMD_ID_BUTT,
} NOTIFY_CMD_ID;
typedef struct { typedef struct {
TdThread thread; TdThread thread;
@ -633,8 +637,9 @@ void loop_consume(SThreadInfo* pInfo) {
} }
} }
uint64_t lastPrintTime = taosGetTimestampMs(); int64_t lastTotalMsgs = 0;
uint64_t startTs = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs();
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000); int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
while (running) { while (running) {
@ -648,19 +653,21 @@ void loop_consume(SThreadInfo* pInfo) {
totalMsgs++; totalMsgs++;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 10 * 1000) { if (currentPrintTime - lastPrintTime > 10 * 1000) {
taosFprintfFile(g_fp, "consumer id %d has currently poll total msgs: %" PRId64 "\n", pInfo->consumerId, taosFprintfFile(g_fp,
totalMsgs); "consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n",
lastPrintTime = currentPrintTime; pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0/(currentPrintTime - lastPrintTime));
} lastPrintTime = currentPrintTime;
lastTotalMsgs = totalMsgs;
}
if (0 == once_flag) { if (0 == once_flag) {
once_flag = 1; once_flag = 1;
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
} }
if (totalRows >= pInfo->expectMsgCnt) { if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) {
char tmpString[128]; char tmpString[128];
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString)); taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
break; break;
@ -884,8 +891,8 @@ int main(int32_t argc, char* argv[]) {
double tInMs = (double)t / 1000000.0; double tInMs = (double)t / 1000000.0;
taosFprintfFile(g_fp, taosFprintfFile(g_fp,
"Spent %.4f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.2f msgs/second\n\n", "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/second\n\n",
tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs)); tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosFprintfFile(g_fp, "==== close tmqlog ====\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);