Merge branch 'main' of https://github.com/taosdata/TDengine into feat/compact_with_time_range
This commit is contained in:
commit
dc0d7c26ea
|
@ -2,7 +2,7 @@
|
|||
IF (DEFINED VERNUMBER)
|
||||
SET(TD_VER_NUMBER ${VERNUMBER})
|
||||
ELSE ()
|
||||
SET(TD_VER_NUMBER "3.0.2.6")
|
||||
SET(TD_VER_NUMBER "3.0.3.0")
|
||||
ENDIF ()
|
||||
|
||||
IF (DEFINED VERCOMPATIBLE)
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
# taosadapter
|
||||
ExternalProject_Add(taosadapter
|
||||
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
||||
GIT_TAG 7920f98
|
||||
GIT_TAG 97d717d
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -219,6 +219,7 @@ int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
|||
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
|
||||
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
|
||||
void qStreamCloseTsdbReader(void* task);
|
||||
void resetTaskInfo(qTaskInfo_t tinfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -61,6 +61,16 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
|
|||
rpcFreeCont(pRsp->pCont);
|
||||
}
|
||||
|
||||
void dmEpSetToStr(char *buf, int32_t len, SEpSet *epSet) {
|
||||
int32_t n = 0;
|
||||
n += snprintf(buf + n, len - n, "%s", "{");
|
||||
for (int i = 0; i < epSet->numOfEps; i++) {
|
||||
n += snprintf(buf + n, len - n, "%s:%d%s", epSet->eps[i].fqdn, epSet->eps[i].port,
|
||||
(i + 1 < epSet->numOfEps ? ", " : ""));
|
||||
}
|
||||
n += snprintf(buf + n, len - n, "%s", "}");
|
||||
}
|
||||
|
||||
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||
SStatusReq req = {0};
|
||||
|
||||
|
@ -119,11 +129,10 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
|||
dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
||||
rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp);
|
||||
if (rpcRsp.code != 0) {
|
||||
dError("failed to send status req since %s, numOfEps:%d inUse:%d", tstrerror(rpcRsp.code), epSet.numOfEps,
|
||||
epSet.inUse);
|
||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||
dDebug("index:%d, mnode ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
|
||||
}
|
||||
dmRotateMnodeEpSet(pMgmt->pData);
|
||||
char tbuf[256];
|
||||
dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
|
||||
dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse);
|
||||
}
|
||||
dmProcessStatusRsp(pMgmt, &rpcRsp);
|
||||
}
|
||||
|
|
|
@ -166,6 +166,7 @@ int32_t dmReadEps(SDnodeData *pData);
|
|||
int32_t dmWriteEps(SDnodeData *pData);
|
||||
void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
|
||||
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
||||
void dmRotateMnodeEpSet(SDnodeData *pData);
|
||||
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
|
||||
bool dmUpdateDnodeInfo(void *pData, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port);
|
||||
|
|
|
@ -325,6 +325,28 @@ void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
|
|||
taosThreadRwlockUnlock(&pData->lock);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void dmSwapEps(SEp *epLhs, SEp *epRhs) {
|
||||
SEp epTmp;
|
||||
|
||||
epTmp.port = epLhs->port;
|
||||
tstrncpy(epTmp.fqdn, epLhs->fqdn, tListLen(epTmp.fqdn));
|
||||
|
||||
epLhs->port = epRhs->port;
|
||||
tstrncpy(epLhs->fqdn, epRhs->fqdn, tListLen(epLhs->fqdn));
|
||||
|
||||
epRhs->port = epTmp.port;
|
||||
tstrncpy(epRhs->fqdn, epTmp.fqdn, tListLen(epRhs->fqdn));
|
||||
}
|
||||
|
||||
void dmRotateMnodeEpSet(SDnodeData *pData) {
|
||||
taosThreadRwlockRdlock(&pData->lock);
|
||||
SEpSet *pEpSet = &pData->mnodeEps;
|
||||
for (int i = 1; i < pEpSet->numOfEps; i++) {
|
||||
dmSwapEps(&pEpSet->eps[i - 1], &pEpSet->eps[i]);
|
||||
}
|
||||
taosThreadRwlockUnlock(&pData->lock);
|
||||
}
|
||||
|
||||
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
dmGetMnodeEpSet(pData, pEpSet);
|
||||
dTrace("msg is redirected, handle:%p num:%d use:%d", pMsg->info.handle, pEpSet->numOfEps, pEpSet->inUse);
|
||||
|
|
|
@ -257,9 +257,6 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TS
|
|||
|
||||
SLastCol lastCol = {.ts = keyTs, .colVal = colVal};
|
||||
if (IS_VAR_DATA_TYPE(colVal.type) && colVal.value.nData > 0) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLast, iCol);
|
||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||
|
||||
lastCol.colVal.value.pData = taosMemoryMalloc(colVal.value.nData);
|
||||
if (lastCol.colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1265,17 +1262,58 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray, SCacheRowsReader *pr) {
|
||||
int32_t code = 0;
|
||||
static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) {
|
||||
SArray *pColArray = taosArrayInit(pTSchema->numOfCols, sizeof(SLastCol));
|
||||
if (NULL == pColArray) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pTSchema->numOfCols; ++i) {
|
||||
SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[i].colId, pTSchema->columns[i].type)};
|
||||
taosArrayPush(pColArray, &col);
|
||||
}
|
||||
*ppColArray = pColArray;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
|
||||
int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
|
||||
*ppDst = taosMemoryMalloc(len);
|
||||
if (NULL == *ppDst) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(*ppDst, pSrc, len);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
|
||||
if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
|
||||
return cloneTSchema(pReader->pSchema, &pReader->pCurrSchema);
|
||||
}
|
||||
|
||||
if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pReader->pCurrSchema);
|
||||
return metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pCurrSchema);
|
||||
}
|
||||
|
||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray, SCacheRowsReader *pr) {
|
||||
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||
int16_t nCol = pTSchema->numOfCols;
|
||||
int16_t nLastCol = pTSchema->numOfCols;
|
||||
int16_t iCol = 0;
|
||||
int16_t noneCol = 0;
|
||||
bool setNoneCol = false;
|
||||
SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
|
||||
bool hasRow = false;
|
||||
SArray *pColArray = NULL;
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
|
||||
int32_t code = initLastColArray(pTSchema, &pColArray);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
CacheNextRowIter iter = {0};
|
||||
|
@ -1290,6 +1328,15 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
|
|||
break;
|
||||
}
|
||||
|
||||
hasRow = true;
|
||||
|
||||
code = updateTSchema(TSDBROW_SVERSION(pRow), pr, uid);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
goto _err;
|
||||
}
|
||||
pTSchema = pr->pCurrSchema;
|
||||
int16_t nCol = pTSchema->numOfCols;
|
||||
|
||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||
|
||||
if (lastRowTs == TSKEY_MAX) {
|
||||
|
@ -1297,29 +1344,27 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
|
|||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
|
||||
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal});
|
||||
|
||||
for (iCol = 1; iCol < nCol; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
break;
|
||||
}
|
||||
SLastCol *pCol = taosArrayGet(pColArray, iCol);
|
||||
if (pCol->colVal.cid != pTSchema->columns[iCol].colId) {
|
||||
continue;
|
||||
}
|
||||
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
|
||||
|
||||
SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal};
|
||||
*pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
|
||||
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
|
||||
if (lastCol.colVal.value.pData == NULL) {
|
||||
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||
if (pCol->colVal.value.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||
}
|
||||
|
||||
if (taosArrayPush(pColArray, &lastCol) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||
}
|
||||
|
||||
if (COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
|
||||
|
@ -1376,6 +1421,9 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
|
|||
//*ppColArray = NULL;
|
||||
// taosArrayDestroy(pColArray);
|
||||
//} else {
|
||||
if (!hasRow) {
|
||||
taosArrayClear(pColArray);
|
||||
}
|
||||
*ppColArray = pColArray;
|
||||
//}
|
||||
|
||||
|
@ -1390,43 +1438,6 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) {
|
||||
int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols;
|
||||
*ppDst = taosMemoryMalloc(len);
|
||||
if (NULL == *ppDst) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
memcpy(*ppDst, pSrc, len);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t updateTSchema(int32_t sversion, SCacheRowsReader *pReader, uint64_t uid) {
|
||||
if (NULL == pReader->pCurrSchema && sversion == pReader->pSchema->version) {
|
||||
return cloneTSchema(pReader->pSchema, &pReader->pCurrSchema);
|
||||
}
|
||||
|
||||
if (NULL != pReader->pCurrSchema && sversion == pReader->pCurrSchema->version) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pReader->pCurrSchema);
|
||||
return metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pCurrSchema);
|
||||
}
|
||||
|
||||
static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) {
|
||||
SArray *pColArray = taosArrayInit(pTSchema->numOfCols, sizeof(SLastCol));
|
||||
if (NULL == pColArray) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pTSchema->numOfCols; ++i) {
|
||||
SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[i].colId, pTSchema->columns[i].type)};
|
||||
taosArrayPush(pColArray, &col);
|
||||
}
|
||||
*ppColArray = pColArray;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr) {
|
||||
STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
|
||||
int16_t nLastCol = pTSchema->numOfCols;
|
||||
|
|
|
@ -38,16 +38,17 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
*(int64_t*)p->buf = pColVal->ts;
|
||||
allNullRow = false;
|
||||
} else {
|
||||
int32_t slotId = slotIds[i];
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
|
||||
int32_t slotId = slotIds[i];
|
||||
// add check for null value, caused by the modification of table schema (new column added).
|
||||
if (pColVal == NULL) {
|
||||
if (slotId >= taosArrayGetSize(pRow)) {
|
||||
p->ts = 0;
|
||||
p->isNull = true;
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
continue;
|
||||
}
|
||||
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
|
||||
p->ts = pColVal->ts;
|
||||
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
|
||||
allNullRow = p->isNull & allNullRow;
|
||||
|
@ -79,7 +80,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
|||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
colDataSetVal(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
|
||||
} else {
|
||||
int32_t slotId = slotIds[i];
|
||||
int32_t slotId = slotIds[i];
|
||||
// add check for null value, caused by the modification of table schema (new column added).
|
||||
if (slotId >= taosArrayGetSize(pRow)) {
|
||||
colDataSetNULL(pColInfoData, numOfRows);
|
||||
continue;
|
||||
}
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
SColVal* pVal = &pColVal->colVal;
|
||||
|
||||
|
|
|
@ -104,6 +104,12 @@ static void clearStreamBlock(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
void resetTaskInfo(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
pTaskInfo->code = 0;
|
||||
clearStreamBlock(pTaskInfo->pRoot);
|
||||
}
|
||||
|
||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->numOfDownstream == 0) {
|
||||
|
@ -618,7 +624,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
|||
pTaskInfo->cost.start = taosGetTimestampUs();
|
||||
}
|
||||
|
||||
if (isTaskKilled(pTaskInfo) && pTaskInfo->code != TSDB_CODE_QRY_IN_EXEC) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
clearStreamBlock(pTaskInfo->pRoot);
|
||||
atomic_store_64(&pTaskInfo->owner, 0);
|
||||
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
||||
|
|
|
@ -946,6 +946,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
|
|||
}
|
||||
|
||||
FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
||||
qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists));
|
||||
taosArrayClear(pInfo->pBlockLists);
|
||||
pInfo->validBlockIndex = 0;
|
||||
}
|
||||
|
|
|
@ -2462,6 +2462,9 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
|
|||
int32_t numOfElems = 0;
|
||||
|
||||
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
|
||||
if (colDataIsNull_s(pCol, i)) {
|
||||
continue;
|
||||
}
|
||||
char* data = colDataGetData(pCol, i);
|
||||
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
|
||||
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
|
||||
|
|
|
@ -2500,6 +2500,12 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
|||
STempTableNode* pTempTable = (STempTableNode*)pTable;
|
||||
code = translateSubquery(pCxt, pTempTable->pSubquery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pTempTable->pSubquery) &&
|
||||
((SSelectStmt*)pTempTable->pSubquery)->isEmptyResult &&
|
||||
isSelectStmt(pCxt->pCurrStmt)) {
|
||||
((SSelectStmt*)pCxt->pCurrStmt)->isEmptyResult = true;
|
||||
}
|
||||
|
||||
pTempTable->table.precision = getStmtPrecision(pTempTable->pSubquery);
|
||||
pTempTable->table.singleTable = stmtIsSingleTable(pTempTable->pSubquery);
|
||||
code = addNamespace(pCxt, pTempTable);
|
||||
|
|
|
@ -20,6 +20,11 @@
|
|||
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
|
||||
int32_t code;
|
||||
void* exec = pTask->exec.executor;
|
||||
while(atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
|
||||
qError("stream task wait for the end of fill history");
|
||||
taosMsleep(2);
|
||||
continue;
|
||||
}
|
||||
|
||||
// set input
|
||||
const SStreamQueueItem* pItem = (const SStreamQueueItem*)data;
|
||||
|
@ -58,6 +63,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
|
|||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
if ((code = qExecTask(exec, &output, &ts)) < 0) {
|
||||
if (code == TSDB_CODE_QRY_IN_EXEC) {
|
||||
resetTaskInfo(exec);
|
||||
}
|
||||
/*ASSERT(false);*/
|
||||
qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId,
|
||||
terrstr());
|
||||
|
@ -121,8 +129,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
|||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
taosArrayDestroy(pRes);
|
||||
return -1;
|
||||
continue;
|
||||
}
|
||||
if (output == NULL) {
|
||||
if (qStreamRecoverScanFinished(exec)) {
|
||||
|
|
|
@ -168,7 +168,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
|
|||
return qStreamRestoreParam(exec);
|
||||
}
|
||||
int32_t streamSetStatusNormal(SStreamTask* pTask) {
|
||||
pTask->taskStatus = TASK_STATUS__NORMAL;
|
||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -139,7 +139,7 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {
|
|||
if (taosRenameFile(file, realfile) != 0) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
sInfo("vgId:%d, succeed to write raft store file:%s, len:%d", pNode->vgId, realfile, len);
|
||||
sInfo("vgId:%d, succeed to write raft store file:%s, term:%" PRId64, pNode->vgId, realfile, pStore->currentTerm);
|
||||
|
||||
_OVER:
|
||||
if (pJson != NULL) tjsonDelete(pJson);
|
||||
|
|
|
@ -733,7 +733,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
|
||||
#,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDnodeRestart1.py
|
||||
|
|
|
@ -3,6 +3,8 @@ import taos
|
|||
import sys
|
||||
import os
|
||||
import time
|
||||
import inspect
|
||||
from taos.tmq import Consumer
|
||||
|
||||
from pathlib import Path
|
||||
from util.log import *
|
||||
|
@ -99,6 +101,7 @@ class TDTestCase:
|
|||
|
||||
|
||||
def run(self):
|
||||
scriptsPath = os.path.dirname(os.path.realpath(__file__))
|
||||
distro_id = distro.id()
|
||||
if distro_id == "alpine":
|
||||
tdLog.info(f"alpine skip compatibility test")
|
||||
|
@ -128,19 +131,18 @@ class TDTestCase:
|
|||
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{BASEVERSION}")
|
||||
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
|
||||
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
|
||||
sleep(3)
|
||||
os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
|
||||
os.system(f"sed -i 's/\/etc\/taos/{cPath}/' 0-others/tmqBasic.json ")
|
||||
# os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/tmqBasic.json -y ")
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ')
|
||||
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ')
|
||||
|
||||
# tdsqlF.query(f"select count(*) from {stb}")
|
||||
# tdsqlF.checkData(0,0,tableNumbers*recordNumbers1)
|
||||
os.system("pkill taosd")
|
||||
self.checkProcessPid("taosd")
|
||||
|
||||
print(f"start taosd: nohup taosd -c {cPath} & ")
|
||||
os.system(f" nohup taosd -c {cPath} & " )
|
||||
sleep(10)
|
||||
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
|
||||
os.system("pkill taosd") # make sure all the data are saved in disk.
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
|
||||
os.system("pkill taosd") # make sure all the data are saved in disk.
|
||||
self.checkProcessPid("taosd")
|
||||
|
||||
|
||||
|
@ -161,10 +163,12 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version-{nowServerVersion}")
|
||||
tdsql.query(f"select count(*) from {stb}")
|
||||
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
|
||||
os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
|
||||
tdsql.query(f"select count(*) from {stb}")
|
||||
tdsql.checkData(0,0,tableNumbers*recordNumbers2)
|
||||
tdsql.checkData(0,0,tableNumbers*recordNumbers1)
|
||||
# tdsql.query("show streams;")
|
||||
# os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
|
||||
# tdsql.query("show streams;")
|
||||
# tdsql.query(f"select count(*) from {stb}")
|
||||
# tdsql.checkData(0,0,tableNumbers*recordNumbers2)
|
||||
tdsql.query(f"select count(*) from db4096.stb0")
|
||||
tdsql.checkData(0,0,50000)
|
||||
|
||||
|
@ -183,13 +187,58 @@ class TDTestCase:
|
|||
tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);")
|
||||
tdsql.query("select * from db.ct4")
|
||||
tdsql.checkData(0,1,14)
|
||||
print(1)
|
||||
tdsql=tdCom.newTdSql()
|
||||
tdsql.query("describe information_schema.ins_databases;")
|
||||
qRows=tdsql.queryRows
|
||||
for i in range(qRows) :
|
||||
if tdsql.queryResult[i][0]=="retentions" :
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
comFlag=True
|
||||
j=0
|
||||
while comFlag:
|
||||
for i in range(qRows) :
|
||||
if tdsql.queryResult[i][0] == "retentions" :
|
||||
print("parameters include retentions")
|
||||
comFlag=False
|
||||
break
|
||||
else :
|
||||
comFlag=True
|
||||
j=j+1
|
||||
if j == qRows:
|
||||
print("parameters don't include retentions")
|
||||
caller = inspect.getframeinfo(inspect.stack()[0][0])
|
||||
args = (caller.filename, caller.lineno)
|
||||
tdLog.exit("%s(%d) failed" % args)
|
||||
tdsql.query("show streams;")
|
||||
tdsql.checkRows(2)
|
||||
tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);")
|
||||
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
|
||||
|
||||
conn = taos.connect()
|
||||
|
||||
consumer = Consumer(
|
||||
{
|
||||
"group.id": "tg75",
|
||||
"client.id": "124",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"enable.auto.commit": "true",
|
||||
"experimental.snapshot.enable": "true",
|
||||
}
|
||||
)
|
||||
consumer.subscribe(["tmq_test_topic"])
|
||||
|
||||
while True:
|
||||
res = consumer.poll(10)
|
||||
if not res:
|
||||
break
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
val = res.value()
|
||||
|
||||
for block in val:
|
||||
print(block.fetchall())
|
||||
tdsql.query("show topics;")
|
||||
tdsql.checkRows(1)
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
from taos.tmq import Consumer
|
||||
import taos
|
||||
import taosrest
|
||||
import socket
|
||||
|
||||
|
||||
def init_tmq_env(db, topic):
|
||||
conn = taos.connect()
|
||||
# conn.execute("create dnode test209")
|
||||
# conn.execute("create dnode test216")
|
||||
# conn.execute("create mnode on dnode 2")
|
||||
# conn.execute("create mnode on dnode 3")
|
||||
|
||||
conn.execute("drop topic if exists {}".format(topic))
|
||||
conn.execute("drop database if exists {}".format(db))
|
||||
conn.execute("create database if not exists {} replica 1 ".format(db))
|
||||
conn.select_db(db)
|
||||
conn.execute(
|
||||
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
|
||||
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
|
||||
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
|
||||
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
|
||||
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
|
||||
conn.execute("insert into tb1 values (now+10s, 1, 1.0, 'tmq test')")
|
||||
conn.execute("insert into tb2 values (now+100s, 2, 2.0, 'tmq test')")
|
||||
conn.execute("insert into tb3 values (now+20s, 3, 3.0, 'tmq test')")
|
||||
conn.execute("insert into tb3 values (now+30s, 4, 4.0, 'tmq test4')")
|
||||
|
||||
def init_tmq_rest_env(db, topic):
|
||||
host = socket.gethostname()
|
||||
conn = taosrest.connect(url=f"http://{host}:6041")
|
||||
|
||||
# conn.execute("create dnode test209")
|
||||
# conn.execute("create dnode test216")
|
||||
# conn.execute("create mnode on dnode 2")
|
||||
# conn.execute("create mnode on dnode 3")
|
||||
|
||||
conn.execute("drop topic if exists {}".format(topic))
|
||||
conn.execute("drop database if exists {}".format(db))
|
||||
conn.execute("create database if not exists {} replica 3 ".format(db))
|
||||
conn.select_db(db)
|
||||
conn.execute(
|
||||
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
|
||||
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
|
||||
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
|
||||
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
|
||||
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
|
||||
conn.execute("insert into tb1 values (now+10s, 1, 1.0, 'tmq test')")
|
||||
conn.execute("insert into tb2 values (now+100s, 2, 2.0, 'tmq test')")
|
||||
conn.execute("insert into tb3 values (now+20s, 3, 3.0, 'tmq test')")
|
||||
conn.execute("insert into tb3 values (now+30s, 4, 4.0, 'tmq test4')")
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
conn = taos.connect()
|
||||
|
||||
init_tmq_env("tmq_test", "tmq_test_topic") # init env
|
||||
# init_tmq_rest_env("tmq_test", "tmq_test_topic")
|
||||
consumer = Consumer(
|
||||
{
|
||||
"group.id": "tg75",
|
||||
"client.id": "124",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"enable.auto.commit": "true",
|
||||
"experimental.snapshot.enable": "true",
|
||||
}
|
||||
)
|
||||
consumer.subscribe(["tmq_test_topic"])
|
||||
|
||||
while True:
|
||||
res = consumer.poll(10)
|
||||
if not res:
|
||||
break
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
val = res.value()
|
||||
|
||||
for block in val:
|
||||
print(block.fetchall())
|
|
@ -0,0 +1,24 @@
|
|||
{
|
||||
"filetype": "subscribe",
|
||||
"cfgdir": "/etc/taos",
|
||||
"host": "127.0.0.1",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"confirm_parameter_prompt": "no",
|
||||
"tmq_info": {
|
||||
"concurrent": 1,
|
||||
"poll_delay": 10000,
|
||||
"group.id": "grpId_0",
|
||||
"client.id": "clientId",
|
||||
"auto.offset.reset": "earliest",
|
||||
"enable.auto.commit": "true",
|
||||
"auto.commit.interval.ms": 1000,
|
||||
"enable.heartbeat.background": "true",
|
||||
"experimental.snapshot.enable": "true",
|
||||
"msg.with.table.name": "false",
|
||||
"topic_list": [
|
||||
{"name": "tmq_topic_1", "sql": "select current,voltage,phase from test.meters where voltage <= 106 ;"}
|
||||
]
|
||||
}
|
||||
}
|
|
@ -86,7 +86,7 @@ class TDTestCase:
|
|||
self.ctbNums=ctbNums
|
||||
rowNUms=5000
|
||||
ts=1451606400000
|
||||
tdSql.execute(f"create database {dbname};")
|
||||
tdSql.execute(f"create database {dbname} cachemodel 'both';")
|
||||
tdSql.execute(f"use {dbname} ")
|
||||
tdSql.execute(f'''
|
||||
create table {stabname1} (ts timestamp,latitude double,longitude double,elevation double,velocity double,heading double,grade double,fuel_consumption double) tags (name binary(30),fleet binary(30),driver binary(30),model binary(30),device_version binary(30),load_capacity double,fuel_capacity double,nominal_fuel_consumption double);
|
||||
|
|
Loading…
Reference in New Issue