Merge remote-tracking branch 'origin/main' into enh/TD-21207

This commit is contained in:
Shengliang Guan 2023-01-03 20:10:00 +08:00
commit 1dbb9db552
16 changed files with 329 additions and 116 deletions

View File

@ -772,8 +772,8 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
return 0;
}
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
// #define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
// #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
if (pIter == NULL) return NULL;

View File

@ -66,6 +66,17 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
return -1;
}
if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) {
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId,
pHandle->subKey, offset.val.version);
}
}
}
taosMemoryFree(memBuf);
}

View File

@ -19,9 +19,13 @@
#define SL_MAX_LEVEL 5
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_GET_NODE_FORWARD(n, l) ((SMemSkipListNode *)atomic_load_64((int64_t *)&SL_NODE_FORWARD(n, l)))
#define SL_GET_NODE_BACKWARD(n, l) ((SMemSkipListNode *)atomic_load_64((int64_t *)&SL_NODE_BACKWARD(n, l)))
#define SL_SET_NODE_FORWARD(n, l, p) atomic_store_64((int64_t *)&SL_NODE_FORWARD(n, l), (int64_t)(p))
#define SL_SET_NODE_BACKWARD(n, l, p) atomic_store_64((int64_t *)&SL_NODE_BACKWARD(n, l), (int64_t)(p))
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2
@ -246,18 +250,18 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa
if (pFrom == NULL) {
// create from head or tail
if (backward) {
pIter->pNode = SL_NODE_BACKWARD(pTbData->sl.pTail, 0);
pIter->pNode = SL_GET_NODE_BACKWARD(pTbData->sl.pTail, 0);
} else {
pIter->pNode = SL_NODE_FORWARD(pTbData->sl.pHead, 0);
pIter->pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
}
} else {
// create from a key
if (backward) {
tbDataMovePosTo(pTbData, pos, pFrom, SL_MOVE_BACKWARD);
pIter->pNode = SL_NODE_BACKWARD(pos[0], 0);
pIter->pNode = SL_GET_NODE_BACKWARD(pos[0], 0);
} else {
tbDataMovePosTo(pTbData, pos, pFrom, 0);
pIter->pNode = SL_NODE_FORWARD(pos[0], 0);
pIter->pNode = SL_GET_NODE_FORWARD(pos[0], 0);
}
}
}
@ -271,7 +275,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
return false;
}
pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0);
pIter->pNode = SL_GET_NODE_BACKWARD(pIter->pNode, 0);
if (pIter->pNode == pIter->pTbData->sl.pHead) {
return false;
}
@ -282,7 +286,7 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
return false;
}
pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0);
pIter->pNode = SL_GET_NODE_FORWARD(pIter->pNode, 0);
if (pIter->pNode == pIter->pTbData->sl.pTail) {
return false;
}
@ -335,7 +339,7 @@ static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid
int8_t maxLevel = pMemTable->pTsdb->pVnode->config.tsdbCfg.slLevel;
ASSERT(pPool != NULL);
pTbData = vnodeBufPoolMalloc(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
pTbData = vnodeBufPoolMallocAligned(pPool, sizeof(*pTbData) + SL_NODE_SIZE(maxLevel) * 2);
if (pTbData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@ -408,7 +412,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
if (fromPos) px = pos[pTbData->sl.level - 1];
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_BACKWARD(px, iLevel);
pn = SL_GET_NODE_BACKWARD(px, iLevel);
while (pn != pTbData->sl.pHead) {
tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts;
@ -418,7 +422,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
break;
} else {
px = pn;
pn = SL_NODE_BACKWARD(px, iLevel);
pn = SL_GET_NODE_BACKWARD(px, iLevel);
}
}
@ -438,7 +442,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
if (fromPos) px = pos[pTbData->sl.level - 1];
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_FORWARD(px, iLevel);
pn = SL_GET_NODE_FORWARD(px, iLevel);
while (pn != pTbData->sl.pTail) {
tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts;
@ -448,7 +452,7 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
break;
} else {
px = pn;
pn = SL_NODE_FORWARD(px, iLevel);
pn = SL_GET_NODE_FORWARD(px, iLevel);
}
}
@ -474,58 +478,53 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
int8_t level;
SMemSkipListNode *pNode;
SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse;
int64_t nSize;
// node
// create node
level = tsdbMemSkipListRandLevel(&pTbData->sl);
ASSERT(pPool != NULL);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level));
nSize = SL_NODE_SIZE(level);
pNode = (SMemSkipListNode *)vnodeBufPoolMallocAligned(pPool, nSize + pRow->len);
if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pNode->level = level;
pNode->version = version;
pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len);
if (NULL == pNode->pTSRow) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pNode->pTSRow = (STSRow *)((char *)pNode + nSize);
memcpy(pNode->pTSRow, pRow, pRow->len);
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
SMemSkipListNode *pn = pos[iLevel];
SMemSkipListNode *px;
if (forward) {
px = SL_NODE_FORWARD(pn, iLevel);
SL_NODE_BACKWARD(pNode, iLevel) = pn;
SL_NODE_FORWARD(pNode, iLevel) = px;
} else {
px = SL_NODE_BACKWARD(pn, iLevel);
SL_NODE_BACKWARD(pNode, iLevel) = px;
SL_NODE_FORWARD(pNode, iLevel) = pn;
// set node
if (forward) {
for (int8_t iLevel = 0; iLevel < level; iLevel++) {
SL_NODE_FORWARD(pNode, iLevel) = SL_NODE_FORWARD(pos[iLevel], iLevel);
SL_NODE_BACKWARD(pNode, iLevel) = pos[iLevel];
}
} else {
for (int8_t iLevel = 0; iLevel < level; iLevel++) {
SL_NODE_FORWARD(pNode, iLevel) = pos[iLevel];
SL_NODE_BACKWARD(pNode, iLevel) = SL_NODE_BACKWARD(pos[iLevel], iLevel);
}
}
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
SMemSkipListNode *pn = pos[iLevel];
SMemSkipListNode *px;
// set forward and backward
if (forward) {
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
SMemSkipListNode *pNext = pos[iLevel]->forwards[iLevel];
if (forward) {
px = SL_NODE_FORWARD(pn, iLevel);
SL_SET_NODE_FORWARD(pos[iLevel], iLevel, pNode);
SL_SET_NODE_BACKWARD(pNext, iLevel, pNode);
SL_NODE_FORWARD(pn, iLevel) = pNode;
SL_NODE_BACKWARD(px, iLevel) = pNode;
} else {
px = SL_NODE_BACKWARD(pn, iLevel);
SL_NODE_FORWARD(px, iLevel) = pNode;
SL_NODE_BACKWARD(pn, iLevel) = pNode;
pos[iLevel] = pNode;
}
} else {
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
SMemSkipListNode *pPrev = pos[iLevel]->forwards[pos[iLevel]->level + iLevel];
pos[iLevel] = pNode;
SL_SET_NODE_FORWARD(pPrev, iLevel, pNode);
SL_SET_NODE_BACKWARD(pos[iLevel], iLevel, pNode);
pos[iLevel] = pNode;
}
}
pTbData->sl.size++;

View File

@ -115,6 +115,10 @@ struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t i
static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) {
SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId);
if (NULL == bufPage) {
return NULL;
}
if (forUpdate) {
setBufPageDirty(bufPage, true);
}

View File

@ -1726,8 +1726,10 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
return w;
}
w = getResultRowByPos(pBuf, &pResultRowInfo->cur, false)->win;
SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
if (pRow) {
w = pRow->win;
}
// in case of typical time window, we can calculate time window directly.
if (w.skey > ts || w.ekey < ts) {
w = doCalculateTimeWindow(ts, pInterval);

View File

@ -150,6 +150,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
pData->num = sizeof(SFilePage);
} else {
pData = getBufPage(pResultBuf, *currentPageId);
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return NULL;
}
pageId = *currentPageId;
if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
@ -200,6 +205,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (isIntervalQuery) {
if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
pResult = getResultRowByPos(pResultBuf, p1, true);
if (NULL == pResult) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
}
} else {
@ -208,6 +217,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (p1 != NULL) {
// todo
pResult = getResultRowByPos(pResultBuf, p1, true);
if (NULL == pResult) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
}
}
@ -216,6 +229,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
SResultRowPosition pos = pResultRowInfo->cur;
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
if (pPage == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
releaseBufPage(pResultBuf, pPage);
}
@ -223,6 +240,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (pResult == NULL) {
ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
// add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
@ -260,6 +280,11 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
} else {
SPageInfo* pi = getLastPageInfo(list);
pData = getBufPage(pResultBuf, getPageId(pi));
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
pageId = getPageId(pi);
if (pData->num + size > getBufPageSize(pResultBuf)) {
@ -912,7 +937,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) {
return;
T_LONG_JMP(pTaskInfo->env, terrno);
}
}
@ -993,6 +1018,11 @@ static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SR
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
SqlFunctionCtx* pCtx = pSup->pCtx;
@ -1036,6 +1066,10 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);

View File

@ -492,13 +492,17 @@ _error:
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SPartitionOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
SDataGroupInfo* pGroupInfo = NULL;
void* pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
if (pPage == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
pGroupInfo->numOfRows += 1;
@ -595,6 +599,10 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
} else {
int32_t* curId = taosArrayGetLast(p->pPageList);
pPage = getBufPage(pInfo->pBuf, *curId);
if (pPage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return pPage;
}
int32_t* rows = (int32_t*)pPage;
if (*rows >= pInfo->rowCapacity) {
@ -674,7 +682,8 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SDataGroupInfo* pGroupInfo =
(pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
@ -692,7 +701,11 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
void* page = getBufPage(pInfo->pBuf, *pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);

View File

@ -170,6 +170,10 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
}
*pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
if (NULL == *pPage) {
return NULL;
}
return (SResultRow*)((char*)(*pPage) + p1->offset);
}

View File

@ -636,6 +636,10 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
}
SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
if (NULL == pr) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
if (pr->closed) {
@ -1315,6 +1319,10 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
if (NULL == pResult) {
return;
}
SqlFunctionCtx* pCtx = pSup->pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
@ -1328,6 +1336,9 @@ static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf,
}
}
SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId);
if (NULL == bufPage) {
return;
}
setBufPageDirty(bufPage, true);
releaseBufPage(pResultBuf, bufPage);
}
@ -4114,6 +4125,9 @@ void destroyMAIOperatorInfo(void* param) {
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (NULL == pResult) {
return pResult;
}
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
return pResult;
}

View File

@ -270,6 +270,10 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
void* pPage = getBufPage(pHandle->pBuf, *pPgId);
if (NULL == pPage) {
return terrno;
}
code = blockDataFromBuf(pSource->src.pBlock, pPage);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -337,6 +341,11 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
void* pPage = getBufPage(pHandle->pBuf, *pPgId);
if (pPage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage);
if (code != TSDB_CODE_SUCCESS) {
return code;

View File

@ -784,7 +784,7 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
pEntryInfo->isNullRes = (pEntryInfo->numOfRes == 0) ? 1 : 0;
if (pCol->info.type == TSDB_DATA_TYPE_FLOAT) {
float v = *(float*)&pRes->v;
float v = GET_FLOAT_VAL(&pRes->v);
colDataAppend(pCol, currentRow, (const char*)&v, pEntryInfo->isNullRes);
} else {
colDataAppend(pCol, currentRow, (const char*)&pRes->v, pEntryInfo->isNullRes);

View File

@ -737,7 +737,12 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
}
if (!pBuf->assign) {
pBuf->v = *(int64_t*)tval;
if (type == TSDB_DATA_TYPE_FLOAT) {
GET_FLOAT_VAL(&pBuf->v) = GET_DOUBLE_VAL(tval);
} else {
pBuf->v = GET_INT64_VAL(tval);
}
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
@ -751,7 +756,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
int64_t val = GET_INT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
*(int64_t*)&pBuf->v = val;
GET_INT64_VAL(&pBuf->v) = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
@ -765,7 +770,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
uint64_t val = GET_UINT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
*(uint64_t*)&pBuf->v = val;
GET_UINT64_VAL(&pBuf->v) = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
@ -779,7 +784,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
double val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) {
*(double*)&pBuf->v = val;
GET_DOUBLE_VAL(&pBuf->v) = val;
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
@ -793,7 +798,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
float val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) {
*(float*)&pBuf->v = val;
GET_FLOAT_VAL(&pBuf->v) = val;
}
if (pCtx->subsidiaries.num > 0) {

View File

@ -460,13 +460,14 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
#else
snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name);
#endif
TdFilePtr file =
taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (file == NULL) {
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
msgInfo->code = TSDB_CODE_FILE_CORRUPTED;
goto _return;
}
int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
if (count != pFuncInfo->codeSize) {
fnError("udfd write udf shared library failed");

View File

@ -39,14 +39,6 @@
#define _SEND_FILE_STEP_ 1000
#endif
#if defined(WINDOWS)
typedef int32_t FileFd;
typedef int32_t SocketFd;
#else
typedef int32_t FileFd;
typedef int32_t SocketFd;
#endif
typedef int32_t FileFd;
typedef struct TdFile {
@ -54,19 +46,10 @@ typedef struct TdFile {
int refId;
FileFd fd;
FILE *fp;
} * TdFilePtr, TdFile;
} TdFile;
#define FILE_WITH_LOCK 1
typedef struct AutoDelFile *AutoDelFilePtr;
typedef struct AutoDelFile {
char *name;
AutoDelFilePtr lastAutoDelFilePtr;
} AutoDelFile;
static TdThreadMutex autoDelFileLock;
static AutoDelFilePtr nowAutoDelFilePtr = NULL;
static TdThreadOnce autoDelFileInit = PTHREAD_ONCE_INIT;
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) {
#ifdef WINDOWS
const char *tdengineTmpFileNamePrefix = "tdengine-";
@ -268,34 +251,6 @@ int32_t taosDevInoFile(TdFilePtr pFile, int64_t *stDev, int64_t *stIno) {
return 0;
}
void autoDelFileList() {
taosThreadMutexLock(&autoDelFileLock);
while (nowAutoDelFilePtr != NULL) {
taosRemoveFile(nowAutoDelFilePtr->name);
AutoDelFilePtr tmp = nowAutoDelFilePtr->lastAutoDelFilePtr;
taosMemoryFree(nowAutoDelFilePtr->name);
taosMemoryFree(nowAutoDelFilePtr);
nowAutoDelFilePtr = tmp;
}
taosThreadMutexUnlock(&autoDelFileLock);
taosThreadMutexDestroy(&autoDelFileLock);
}
void autoDelFileListInit() {
taosThreadMutexInit(&autoDelFileLock, NULL);
atexit(autoDelFileList);
}
void autoDelFileListAdd(const char *path) {
taosThreadOnce(&autoDelFileInit, autoDelFileListInit);
taosThreadMutexLock(&autoDelFileLock);
AutoDelFilePtr tmp = taosMemoryMalloc(sizeof(AutoDelFile));
tmp->lastAutoDelFilePtr = nowAutoDelFilePtr;
tmp->name = taosMemoryStrDup(path);
nowAutoDelFilePtr = tmp;
taosThreadMutexUnlock(&autoDelFileLock);
}
TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
int fd = -1;
FILE *fp = NULL;
@ -313,7 +268,6 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
assert(!(tdFileOptions & TD_FILE_EXCL));
fp = fopen(path, mode);
if (fp == NULL) {
// terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
} else {
@ -331,32 +285,44 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
access |= (tdFileOptions & TD_FILE_TEXT) ? O_TEXT : 0;
access |= (tdFileOptions & TD_FILE_EXCL) ? O_EXCL : 0;
#ifdef WINDOWS
fd = _open(path, access, _S_IREAD | _S_IWRITE);
int32_t pmode = _S_IREAD | _S_IWRITE;
if (tdFileOptions & TD_FILE_AUTO_DEL) {
pmode |= _O_TEMPORARY;
}
fd = _open(path, access, pmode);
#else
fd = open(path, access, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
if (fd == -1) {
// terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
}
TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile));
if (pFile == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
if (fd >= 0) close(fd);
if (fp != NULL) fclose(fp);
return NULL;
}
#if FILE_WITH_LOCK
taosThreadRwlockInit(&(pFile->rwlock), NULL);
#endif
pFile->fd = fd;
pFile->fp = fp;
pFile->refId = 0;
if (tdFileOptions & TD_FILE_AUTO_DEL) {
autoDelFileListAdd(path);
#ifdef WINDOWS
// do nothing, since the property of pmode is set with _O_TEMPORARY; the OS will recycle
// the file handle, as well as the space on disk.
#else
// Remove it instantly, so when the program exits normally/abnormally, the file
// will be automatically remove by OS.
unlink(path);
#endif
}
return pFile;
}

View File

@ -617,6 +617,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/update_data.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/tb_100w_data_order.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
@ -832,6 +834,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 3
@ -928,6 +931,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interp.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 4
@ -1034,6 +1038,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py

View File

@ -0,0 +1,146 @@
from wsgiref.headers import tspecials
from util.log import *
from util.cases import *
from util.sql import *
import numpy as np
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.rowNum = 10000
self.ts = 1537146000000
def run(self):
dbname = "db"
tdSql.prepare()
tdSql.execute(f'''create table {dbname}.ntb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
for i in range(self.rowNum):
tdSql.execute(f"insert into {dbname}.ntb values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i % 127 + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i % 255 + 1, i + 1, i + 1, i + 1))
tdSql.execute('flush database db')
# test functions using sma result
tdSql.query(f"select count(col1),min(col1),max(col1),avg(col1),sum(col1),spread(col1),percentile(col1, 0),first(col1),last(col1) from {dbname}.ntb")
tdSql.checkData(0, 0, 10000)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 127)
tdSql.checkData(0, 3, 63.8449)
tdSql.checkData(0, 4, 638449)
tdSql.checkData(0, 5, 126.0)
tdSql.checkData(0, 6, 1.0)
tdSql.checkData(0, 7, 1)
tdSql.checkData(0, 8, 94)
tdSql.query(f"select count(col2),min(col2),max(col2),avg(col2),sum(col2),spread(col2),percentile(col2, 0),first(col2),last(col2) from {dbname}.ntb")
tdSql.checkData(0, 0, 10000)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 10000)
tdSql.checkData(0, 3, 5000.5)
tdSql.checkData(0, 4, 50005000)
tdSql.checkData(0, 5, 9999.0)
tdSql.checkData(0, 6, 1.0)
tdSql.checkData(0, 7, 1)
tdSql.checkData(0, 8, 10000)
tdSql.query(f"select count(col3),min(col3),max(col3),avg(col3),sum(col3),spread(col3),percentile(col3, 0),first(col3),last(col3) from {dbname}.ntb")
tdSql.checkData(0, 0, 10000)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 10000)
tdSql.checkData(0, 3, 5000.5)
tdSql.checkData(0, 4, 50005000)
tdSql.checkData(0, 5, 9999.0)
tdSql.checkData(0, 6, 1.0)
tdSql.checkData(0, 7, 1)
tdSql.checkData(0, 8, 10000)
tdSql.query(f"select count(col4),min(col4),max(col4),avg(col4),sum(col4),spread(col4),percentile(col4, 0),first(col4),last(col4) from {dbname}.ntb")
tdSql.checkData(0, 0, 10000)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 10000)
tdSql.checkData(0, 3, 5000.5)
tdSql.checkData(0, 4, 50005000)
tdSql.checkData(0, 5, 9999.0)
tdSql.checkData(0, 6, 1.0)
tdSql.checkData(0, 7, 1)
tdSql.checkData(0, 8, 10000)
tdSql.query(f"select count(col5),min(col5),max(col5),avg(col5),sum(col5),spread(col5),percentile(col5, 0),first(col5),last(col5) from {dbname}.ntb")
tdSql.checkData(0, 0, 10000)
tdSql.checkData(0, 1, 0.1)
tdSql.checkData(0, 2, 9999.09961)
tdSql.checkData(0, 3, 4999.599985846)
tdSql.checkData(0, 4, 49995999.858455874)
tdSql.checkData(0, 5, 9998.999609374)
tdSql.checkData(0, 6, 0.100000001)
tdSql.checkData(0, 7, 0.1)
tdSql.checkData(0, 8, 9999.09961)
tdSql.query(f"select count(col6),min(col6),max(col6),avg(col6),sum(col6),spread(col6),percentile(col6, 0),first(col6),last(col6) from {dbname}.ntb")
tdSql.checkData(0, 0, 10000)
tdSql.checkData(0, 1, 0.1)
tdSql.checkData(0, 2, 9999.100000000)
tdSql.checkData(0, 3, 4999.600000001)
tdSql.checkData(0, 4, 49996000.000005305)
tdSql.checkData(0, 5, 9999.000000000)
tdSql.checkData(0, 6, 0.1)
tdSql.checkData(0, 7, 0.1)
tdSql.checkData(0, 8, 9999.1)
tdSql.query(f"select count(col11),min(col11),max(col11),avg(col11),sum(col11),spread(col11),percentile(col11, 0),first(col11),last(col11) from {dbname}.ntb")
tdSql.checkData(0, 0, 10000)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 255)
tdSql.checkData(0, 3, 127.45)
tdSql.checkData(0, 4, 1274500)
tdSql.checkData(0, 5, 254.000000000)
tdSql.checkData(0, 6, 1.0)
tdSql.checkData(0, 7, 1)
tdSql.checkData(0, 8, 55)
tdSql.query(f"select count(col12),min(col12),max(col12),avg(col12),sum(col12),spread(col12),percentile(col12, 0),first(col12),last(col12) from {dbname}.ntb")
tdSql.checkData(0, 0, 10000)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 10000)
tdSql.checkData(0, 3, 5000.5)
tdSql.checkData(0, 4, 50005000)
tdSql.checkData(0, 5, 9999.0)
tdSql.checkData(0, 6, 1.0)
tdSql.checkData(0, 7, 1)
tdSql.checkData(0, 8, 10000)
tdSql.query(f"select count(col13),min(col13),max(col13),avg(col13),sum(col13),spread(col13),percentile(col13, 0),first(col13),last(col13) from {dbname}.ntb")
tdSql.checkData(0, 0, 10000)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 10000)
tdSql.checkData(0, 3, 5000.5)
tdSql.checkData(0, 4, 50005000)
tdSql.checkData(0, 5, 9999.0)
tdSql.checkData(0, 6, 1.0)
tdSql.checkData(0, 7, 1)
tdSql.checkData(0, 8, 10000)
tdSql.query(f"select count(col14),min(col14),max(col14),avg(col14),sum(col14),spread(col14),percentile(col14, 0),first(col14),last(col14) from {dbname}.ntb")
tdSql.checkData(0, 0, 10000)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 10000)
tdSql.checkData(0, 3, 5000.5)
tdSql.checkData(0, 4, 50005000)
tdSql.checkData(0, 5, 9999.0)
tdSql.checkData(0, 6, 1.0)
tdSql.checkData(0, 7, 1)
tdSql.checkData(0, 8, 10000)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())