Merge remote-tracking branch 'origin/3.0' into feature/sync2-merge
This commit is contained in:
commit
21445a717c
|
@ -892,6 +892,7 @@ typedef struct {
|
|||
int32_t numOfVgroups;
|
||||
int32_t numOfStables;
|
||||
int32_t buffer;
|
||||
int32_t cacheSize;
|
||||
int32_t pageSize;
|
||||
int32_t pages;
|
||||
int32_t daysPerFile;
|
||||
|
|
|
@ -84,7 +84,7 @@ typedef struct SOutputData {
|
|||
* @param pHandle output
|
||||
* @return error code
|
||||
*/
|
||||
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam);
|
||||
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
|
||||
|
||||
int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat);
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ extern "C" {
|
|||
|
||||
#define SHOW_CREATE_TB_RESULT_COLS 2
|
||||
#define SHOW_CREATE_TB_RESULT_FIELD1_LEN (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE)
|
||||
#define SHOW_CREATE_TB_RESULT_FIELD2_LEN (TSDB_MAX_BINARY_LEN + VARSTR_HEADER_SIZE)
|
||||
#define SHOW_CREATE_TB_RESULT_FIELD2_LEN (TSDB_MAX_ALLOWED_SQL_LEN * 3)
|
||||
|
||||
#define SHOW_LOCAL_VARIABLES_RESULT_COLS 2
|
||||
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)
|
||||
|
|
|
@ -338,7 +338,7 @@ void doDestroyRequest(void *p) {
|
|||
|
||||
SRequestObj *pRequest = (SRequestObj *)p;
|
||||
|
||||
int64_t reqId = pRequest->self;
|
||||
uint64_t reqId = pRequest->requestId;
|
||||
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
|
||||
|
||||
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
|
||||
|
|
|
@ -893,16 +893,26 @@ void tTagFree(STag *pTag) {
|
|||
}
|
||||
|
||||
char *tTagValToData(const STagVal *value, bool isJson) {
|
||||
if (!value) return NULL;
|
||||
if (!value) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char *data = NULL;
|
||||
int8_t typeBytes = 0;
|
||||
if (isJson) {
|
||||
typeBytes = CHAR_BYTES;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(value->type)) {
|
||||
data = taosMemoryCalloc(1, typeBytes + VARSTR_HEADER_SIZE + value->nData);
|
||||
if (data == NULL) return NULL;
|
||||
if (isJson) *data = value->type;
|
||||
if (data == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (isJson) {
|
||||
*data = value->type;
|
||||
}
|
||||
|
||||
varDataLen(data + typeBytes) = value->nData;
|
||||
memcpy(varDataVal(data + typeBytes), value->pData, value->nData);
|
||||
} else {
|
||||
|
|
|
@ -2765,6 +2765,7 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
|
|||
if (tEncodeI32(&encoder, pRsp->numOfVgroups) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->numOfStables) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->buffer) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->cacheSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->pageSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->pages) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->daysPerFile) < 0) return -1;
|
||||
|
@ -2804,6 +2805,7 @@ int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
|
|||
if (tDecodeI32(&decoder, &pRsp->numOfVgroups) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->numOfStables) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->buffer) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->cacheSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->pageSize) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->pages) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->daysPerFile) < 0) return -1;
|
||||
|
|
|
@ -848,6 +848,7 @@ static int32_t mndProcessGetDbCfgReq(SRpcMsg *pReq) {
|
|||
cfgRsp.numOfVgroups = pDb->cfg.numOfVgroups;
|
||||
cfgRsp.numOfStables = pDb->cfg.numOfStables;
|
||||
cfgRsp.buffer = pDb->cfg.buffer;
|
||||
cfgRsp.cacheSize = pDb->cfg.cacheLastSize;
|
||||
cfgRsp.pageSize = pDb->cfg.pageSize;
|
||||
cfgRsp.pages = pDb->cfg.pages;
|
||||
cfgRsp.daysPerFile = pDb->cfg.daysPerFile;
|
||||
|
|
|
@ -1523,9 +1523,9 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
|
|||
return pReader->pMemSchema;
|
||||
}
|
||||
|
||||
taosMemoryFree(pReader->pMemSchema);
|
||||
taosMemoryFreeClear(pReader->pMemSchema);
|
||||
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
} else {
|
||||
|
@ -2274,7 +2274,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
_end:
|
||||
pResBlock->info.uid = pBlockScanInfo->uid;
|
||||
pResBlock->info.uid = (pBlockScanInfo != NULL)? pBlockScanInfo->uid:0;
|
||||
blockDataUpdateTsWindow(pResBlock, 0);
|
||||
|
||||
setComposedBlockFlag(pReader, true);
|
||||
|
@ -2569,7 +2569,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
if (pScanInfo == NULL) {
|
||||
tsdbError("failed to get table, uid:%" PRIu64 ", %s", pBlockInfo->uid, pReader->idStr);
|
||||
tsdbError("failed to get table scan-info, %s", pReader->idStr);
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -268,10 +268,10 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbFName, S
|
|||
|
||||
len += sprintf(
|
||||
buf2 + VARSTR_HEADER_SIZE,
|
||||
"CREATE DATABASE `%s` BUFFER %d CACHEMODEL '%s' COMP %d DURATION %dm "
|
||||
"CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm "
|
||||
"WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d "
|
||||
"STRICT '%s' WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d",
|
||||
dbFName, pCfg->buffer, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod,
|
||||
dbFName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, pCfg->walFsyncPeriod,
|
||||
pCfg->maxRows, pCfg->minRows, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, pCfg->pages,
|
||||
pCfg->pageSize, prec, pCfg->replications, strictStr(pCfg->strict), pCfg->walLevel, pCfg->numOfVgroups,
|
||||
1 == pCfg->numOfStables);
|
||||
|
@ -496,7 +496,12 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p
|
|||
colDataAppend(pCol1, 0, buf1, false);
|
||||
|
||||
SColumnInfoData* pCol2 = taosArrayGet(pBlock->pDataBlock, 1);
|
||||
char buf2[SHOW_CREATE_TB_RESULT_FIELD2_LEN] = {0};
|
||||
char* buf2 = taosMemoryMalloc(SHOW_CREATE_TB_RESULT_FIELD2_LEN);
|
||||
if (NULL == buf2) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t len = 0;
|
||||
|
||||
if (TSDB_SUPER_TABLE == pCfg->tableType) {
|
||||
|
@ -512,6 +517,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p
|
|||
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ") TAGS (");
|
||||
code = appendTagValues(buf2, &len, pCfg);
|
||||
if (code) {
|
||||
taosMemoryFree(buf2);
|
||||
return code;
|
||||
}
|
||||
len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, ")");
|
||||
|
@ -527,6 +533,8 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p
|
|||
|
||||
colDataAppend(pCol2, 0, buf2, false);
|
||||
|
||||
taosMemoryFree(buf2);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -254,10 +254,12 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) {
|
|||
|
||||
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle,
|
||||
void* pParam) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle));
|
||||
if (NULL == deleter) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink;
|
||||
|
@ -270,17 +272,30 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
|
|||
deleter->pManager = pManager;
|
||||
deleter->pDeleter = pDeleterNode;
|
||||
deleter->pSchema = pDataSink->pInputDataBlockDesc;
|
||||
|
||||
if(pParam == NULL) {
|
||||
code = TSDB_CODE_QRY_INVALID_INPUT;
|
||||
qError("invalid input param in creating data deleter, code%s", tstrerror(code));
|
||||
goto _end;
|
||||
}
|
||||
|
||||
deleter->pParam = pParam;
|
||||
deleter->status = DS_BUF_EMPTY;
|
||||
deleter->queryEnd = false;
|
||||
deleter->pDataBlocks = taosOpenQueue();
|
||||
taosThreadMutexInit(&deleter->mutex, NULL);
|
||||
if (NULL == deleter->pDataBlocks) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
*pHandle = deleter;
|
||||
return code;
|
||||
|
||||
_end:
|
||||
if (deleter != NULL) {
|
||||
destroyDataSinker((SDataSinkHandle*)deleter);
|
||||
taosMemoryFree(deleter);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
*pHandle = deleter;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -231,8 +231,10 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
|||
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
|
||||
SDataDispatchBuf* pBuf = NULL;
|
||||
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
|
||||
taosMemoryFreeClear(pBuf->pData);
|
||||
taosFreeQitem(pBuf);
|
||||
if (pBuf != NULL) {
|
||||
taosMemoryFreeClear(pBuf->pData);
|
||||
taosFreeQitem(pBuf);
|
||||
}
|
||||
}
|
||||
taosCloseQueue(pDispatcher->pDataBlocks);
|
||||
taosThreadMutexDestroy(&pDispatcher->mutex);
|
||||
|
|
|
@ -33,7 +33,7 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam) {
|
||||
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
|
||||
switch ((int)nodeType(pDataSink)) {
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
|
||||
|
@ -42,7 +42,9 @@ int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHand
|
|||
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
|
||||
return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam);
|
||||
}
|
||||
return TSDB_CODE_FAILED;
|
||||
|
||||
qError("invalid input node type:%d, %s", nodeType(pDataSink), id);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
|
||||
|
|
|
@ -370,7 +370,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
|||
goto _error;
|
||||
}
|
||||
|
||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
|
||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFreeClear(pSinkParam);
|
||||
}
|
||||
|
|
|
@ -3101,7 +3101,9 @@ _error:
|
|||
destroyAggOperatorInfo(pInfo);
|
||||
}
|
||||
|
||||
cleanupExprSupp(&pOperator->exprSupp);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -421,14 +421,14 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode
|
|||
goto _error;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
@ -453,7 +453,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode
|
|||
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
destroyGroupOperatorInfo(pInfo);
|
||||
if (pInfo != NULL) {
|
||||
destroyGroupOperatorInfo(pInfo);
|
||||
}
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -465,16 +465,14 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int
|
|||
colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows);
|
||||
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
|
||||
colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows);
|
||||
if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
|
||||
taosMemoryFree(data);
|
||||
}
|
||||
} else { // todo opt for json tag
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
colDataAppend(pColInfoData, i, data, false);
|
||||
}
|
||||
}
|
||||
|
||||
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
|
||||
IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
|
||||
taosMemoryFree(data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -263,29 +263,14 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo
|
|||
static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) {
|
||||
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||
int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType;
|
||||
if (type == QUERY_NODE_COLUMN) {
|
||||
if (type == QUERY_NODE_COLUMN || type == QUERY_NODE_OPERATOR || type == QUERY_NODE_FUNCTION) {
|
||||
int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
|
||||
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
|
||||
|
||||
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
|
||||
char* p = colDataGetData(pSrcCol, rowIndex);
|
||||
saveColData(pRow, i, p, isNull);
|
||||
} else if (type == QUERY_NODE_OPERATOR) {
|
||||
int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
|
||||
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
|
||||
|
||||
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
|
||||
char* p = colDataGetData(pSrcCol, rowIndex);
|
||||
saveColData(pRow, i, p, isNull);
|
||||
} else if (type == QUERY_NODE_FUNCTION) {
|
||||
int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
|
||||
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
|
||||
|
||||
bool isNull = colDataIsNull_s(pSrcCol, rowIndex);
|
||||
char* p = colDataGetData(pSrcCol, rowIndex);
|
||||
saveColData(pRow, i, p, isNull);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
|
|
@ -4411,6 +4411,11 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
|||
taosArrayPush(pInfo->pChildren, &pChildOp);
|
||||
}
|
||||
}
|
||||
|
||||
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
|
||||
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
|
|
|
@ -330,7 +330,7 @@ static SFunctionNode* createFunction(const char* pName, SNodeList* pParameterLis
|
|||
if (NULL == pFunc) {
|
||||
return NULL;
|
||||
}
|
||||
strcpy(pFunc->functionName, pName);
|
||||
snprintf(pFunc->functionName, sizeof(pFunc->functionName), "%s", pName);
|
||||
pFunc->pParameterList = pParameterList;
|
||||
if (TSDB_CODE_SUCCESS != getFuncInfo(pFunc)) {
|
||||
pFunc->pParameterList = NULL;
|
||||
|
@ -408,10 +408,6 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pMergeFunc = pFunc;
|
||||
} else {
|
||||
if (NULL != pFunc) {
|
||||
pFunc->pParameterList = NULL;
|
||||
nodesDestroyNode((SNode*)pFunc);
|
||||
}
|
||||
nodesDestroyList(pParameterList);
|
||||
}
|
||||
|
||||
|
|
|
@ -96,16 +96,19 @@ double findOnlyResult(tMemBucket *pMemBucket) {
|
|||
}
|
||||
|
||||
int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times);
|
||||
SArray *list = *(SArray **)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
|
||||
assert(list->size == 1);
|
||||
SArray **pList = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId));
|
||||
if (pList != NULL) {
|
||||
SArray *list = *pList;
|
||||
assert(list->size == 1);
|
||||
|
||||
int32_t *pageId = taosArrayGet(list, 0);
|
||||
SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId);
|
||||
assert(pPage->num == 1);
|
||||
int32_t *pageId = taosArrayGet(list, 0);
|
||||
SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId);
|
||||
assert(pPage->num == 1);
|
||||
|
||||
double v = 0;
|
||||
GET_TYPED_DATA(v, double, pMemBucket->type, pPage->data);
|
||||
return v;
|
||||
double v = 0;
|
||||
GET_TYPED_DATA(v, double, pMemBucket->type, pPage->data);
|
||||
return v;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -190,28 +190,20 @@ int32_t nodesReleaseAllocator(int64_t allocatorId) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId);
|
||||
if (NULL == pAllocator) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = taosThreadMutexTryLock(&pAllocator->mutex);
|
||||
if (EBUSY != code) {
|
||||
if (NULL == g_pNodeAllocator) {
|
||||
nodesError("allocator id %" PRIx64
|
||||
" release failed: The nodesReleaseAllocator function needs to be called after the nodesAcquireAllocator "
|
||||
"function is called!",
|
||||
allocatorId);
|
||||
if (0 == code) {
|
||||
taosThreadMutexUnlock(&pAllocator->mutex);
|
||||
}
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SNodeAllocator* pAllocator = g_pNodeAllocator;
|
||||
g_pNodeAllocator = NULL;
|
||||
taosThreadMutexUnlock(&pAllocator->mutex);
|
||||
return taosReleaseRef(g_allocatorReqRefPool, allocatorId);
|
||||
}
|
||||
|
||||
|
||||
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId) {
|
||||
if (allocatorId <= 0) {
|
||||
return 0;
|
||||
|
|
|
@ -529,7 +529,7 @@ int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
|
|||
}
|
||||
|
||||
if (tDeserializeSDbCfgRsp(msg, msgSize, &out) != 0) {
|
||||
qError("tDeserializeSDbCfgRsp failed, msgSize:%d", msgSize);
|
||||
qError("tDeserializeSDbCfgRsp failed, msgSize:%d,dbCfgRsp:%lu", msgSize, sizeof(out));
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
return NULL;
|
||||
}
|
||||
if (pInit->label) {
|
||||
tstrncpy(pRpc->label, pInit->label, TSDB_LABEL_LEN);
|
||||
tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label));
|
||||
}
|
||||
|
||||
pRpc->compressSize = pInit->compressSize;
|
||||
|
@ -75,7 +75,7 @@ void* rpcOpen(const SRpcInit* pInit) {
|
|||
}
|
||||
pRpc->parent = pInit->parent;
|
||||
if (pInit->user) {
|
||||
memcpy(pRpc->user, pInit->user, TSDB_UNI_LEN);
|
||||
tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
|
||||
}
|
||||
|
||||
int64_t refId = transAddExHandle(transGetInstMgt(), pRpc);
|
||||
|
@ -87,7 +87,7 @@ void rpcClose(void* arg) {
|
|||
tInfo("start to close rpc");
|
||||
transRemoveExHandle(transGetInstMgt(), (int64_t)arg);
|
||||
transReleaseExHandle(transGetInstMgt(), (int64_t)arg);
|
||||
tInfo("rpc is closed");
|
||||
tInfo("end to close rpc");
|
||||
return;
|
||||
}
|
||||
void rpcCloseImpl(void* arg) {
|
||||
|
|
|
@ -5,7 +5,7 @@ sleep 50
|
|||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql create database test vgroups 1
|
||||
sql create database test vgroups 1;
|
||||
sql select * from information_schema.ins_databases
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
|
@ -29,4 +29,100 @@ if $rows != 0 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql create database test1 vgroups 4;
|
||||
sql use test1;
|
||||
sql create stable st(ts timestamp, a int, b int) tags(t int);
|
||||
sql create table t1 using st tags(1);
|
||||
sql create table t2 using st tags(2);
|
||||
|
||||
sql create stream stream2 trigger window_close into streamt2 as select _wstart, sum(a) from st interval(10s);
|
||||
sql create stream stream3 trigger max_delay 1s into streamt3 as select _wstart, sum(a) from st interval(10s);
|
||||
sql create stream stream4 trigger window_close into streamt4 as select _wstart, sum(a) from t1 interval(10s);
|
||||
sql create stream stream5 trigger max_delay 1s into streamt5 as select _wstart, sum(a) from t1 interval(10s);
|
||||
sql create stream stream6 trigger window_close into streamt6 as select _wstart, sum(a) from st session(ts, 10s);
|
||||
sql create stream stream7 trigger max_delay 1s into streamt7 as select _wstart, sum(a) from st session(ts, 10s);
|
||||
sql create stream stream8 trigger window_close into streamt8 as select _wstart, sum(a) from t1 session(ts, 10s);
|
||||
sql create stream stream9 trigger max_delay 1s into streamt9 as select _wstart, sum(a) from t1 session(ts, 10s);
|
||||
sql create stream stream10 trigger window_close into streamt10 as select _wstart, sum(a) from t1 state_window(b);
|
||||
sql create stream stream11 trigger max_delay 1s into streamt11 as select _wstart, sum(a) from t1 state_window(b);
|
||||
|
||||
sql insert into t1 values(1648791213000,1,1);
|
||||
sql insert into t1 values(1648791213001,2,1);
|
||||
sql insert into t1 values(1648791213002,3,1);
|
||||
|
||||
sql insert into t1 values(1648791233000,4,2);
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop1:
|
||||
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt2;
|
||||
|
||||
if $rows != 1 then
|
||||
print ======streamt2=$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt3;
|
||||
if $rows != 2 then
|
||||
print ======streamt3=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt4;
|
||||
if $rows != 1 then
|
||||
print ======streamt4=$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt5;
|
||||
if $rows != 2 then
|
||||
print ======streamt5=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt6;
|
||||
if $rows != 1 then
|
||||
print ======streamt6=$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt7;
|
||||
if $rows != 2 then
|
||||
print ======streamt7=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt8;
|
||||
if $rows != 1 then
|
||||
print ======streamt8=$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt9;
|
||||
if $rows != 2 then
|
||||
print ======streamt9=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
sql select * from streamt10;
|
||||
if $rows != 1 then
|
||||
print ======streamt10=$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from streamt11;
|
||||
if $rows != 2 then
|
||||
print ======streamt11=$rows
|
||||
goto loop1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -81,6 +81,7 @@ class TDTestCase:
|
|||
dbname = "test"
|
||||
stb = f"{dbname}.meters"
|
||||
self.installTaosd(bPath,cPath)
|
||||
os.system("echo 'debugFlag 143' > /etc/taos/taos.cfg ")
|
||||
tableNumbers=100
|
||||
recordNumbers1=100
|
||||
recordNumbers2=1000
|
||||
|
@ -96,8 +97,8 @@ class TDTestCase:
|
|||
tdLog.info(f"Base client version is {oldClientVersion}")
|
||||
|
||||
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{oldServerVersion}")
|
||||
tdLog.info(f"taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
|
||||
os.system(f"taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
|
||||
tdLog.info(f" LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
|
||||
os.system(f"LD_LIBRARY_PATH=/usr/lib taosBenchmark -t {tableNumbers} -n {recordNumbers1} -y ")
|
||||
sleep(3)
|
||||
|
||||
# tdsqlF.query(f"select count(*) from {stb}")
|
||||
|
|
|
@ -610,4 +610,3 @@ python3 ./test.py -f 2-query/last_row.py -Q 4
|
|||
python3 ./test.py -f 2-query/tsbsQuery.py -Q 4
|
||||
#python3 ./test.py -f 2-query/sml.py -Q 4
|
||||
python3 ./test.py -f 2-query/interp.py -Q 4
|
||||
|
||||
|
|
Loading…
Reference in New Issue