Merge branch '3.0' into fix/TS-4540
This commit is contained in:
commit
ad6da4ab54
|
@ -390,10 +390,12 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
|||
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||
for (int32_t i = 0; i < mapSize; i++) {
|
||||
SSessionKey key = {0};
|
||||
SResultWindowInfo winfo = {0};
|
||||
SCountWindowInfo curWin = {0};
|
||||
buf = decodeSSessionKey(buf, &key);
|
||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||
SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
|
||||
setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo);
|
||||
buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
|
||||
}
|
||||
|
||||
// 2.twAggSup
|
||||
|
@ -694,6 +696,8 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->recvGetAll = false;
|
||||
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
|
||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
|
||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
// for stream
|
||||
void* buff = NULL;
|
||||
int32_t len = 0;
|
||||
|
@ -704,8 +708,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
doStreamCountDecodeOpState(buff, len, pOperator, true);
|
||||
taosMemoryFree(buff);
|
||||
}
|
||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,
|
||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamCountAgg, NULL, destroyStreamCountAggOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
|
||||
|
|
|
@ -406,6 +406,7 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
|||
if (!pInfo) {
|
||||
return buf;
|
||||
}
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
|
||||
// 4.checksum
|
||||
int32_t dataLen = len - sizeof(uint32_t);
|
||||
|
@ -423,6 +424,8 @@ void* doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
|||
SSessionKey key = {0};
|
||||
SResultWindowInfo winfo = {0};
|
||||
buf = decodeSSessionKey(buf, &key);
|
||||
pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &winfo.sessionWin, pAggSup->gap,
|
||||
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
|
||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||
}
|
||||
|
@ -735,6 +738,8 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->reCkBlock = false;
|
||||
pInfo->recvGetAll = false;
|
||||
|
||||
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
// for stream
|
||||
void* buff = NULL;
|
||||
int32_t len = 0;
|
||||
|
@ -746,8 +751,6 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
taosMemoryFree(buff);
|
||||
}
|
||||
|
||||
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
|
||||
|
|
|
@ -2533,7 +2533,6 @@ int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outL
|
|||
|
||||
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
||||
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
||||
key->pStatePos->pRowBuff = NULL;
|
||||
buf = decodeSSessionKey(buf, &key->sessionWin);
|
||||
return buf;
|
||||
}
|
||||
|
@ -2591,6 +2590,7 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
if (!pInfo) {
|
||||
return buf;
|
||||
}
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
|
||||
// 5.checksum
|
||||
if (isParent) {
|
||||
|
@ -2609,6 +2609,8 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
|
|||
SSessionKey key = {0};
|
||||
SResultWindowInfo winfo = {0};
|
||||
buf = decodeSSessionKey(buf, &key);
|
||||
pAggSup->stateStore.streamStateSessionAddIfNotExist(pAggSup->pState, &winfo.sessionWin, pAggSup->gap,
|
||||
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
|
||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||
}
|
||||
|
@ -2992,6 +2994,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
pInfo->recvGetAll = false;
|
||||
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
// for stream
|
||||
void* buff = NULL;
|
||||
int32_t len = 0;
|
||||
|
@ -3002,8 +3006,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
doStreamSessionDecodeOpState(buff, len, pOperator, true);
|
||||
taosMemoryFree(buff);
|
||||
}
|
||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
||||
|
@ -3538,6 +3540,7 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
|||
if (!pInfo) {
|
||||
return buf;
|
||||
}
|
||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
|
||||
// 5.checksum
|
||||
if (isParent) {
|
||||
|
@ -3556,6 +3559,9 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
|
|||
SSessionKey key = {0};
|
||||
SResultWindowInfo winfo = {0};
|
||||
buf = decodeSSessionKey(buf, &key);
|
||||
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &winfo.sessionWin, NULL,
|
||||
pAggSup->stateKeySize, compareStateKey,
|
||||
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
|
||||
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
|
||||
}
|
||||
|
@ -3873,6 +3879,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
|
||||
pInfo->recvGetAll = false;
|
||||
|
||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
// for stream
|
||||
void* buff = NULL;
|
||||
int32_t len = 0;
|
||||
|
@ -3884,8 +3892,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
taosMemoryFree(buff);
|
||||
}
|
||||
|
||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
|
||||
|
|
|
@ -4547,7 +4547,7 @@ static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getQueryTimeRange(pCxt, pSelect->pWhere, &pSelect->timeRange);
|
||||
}
|
||||
if (pSelect->pWhere != NULL) {
|
||||
if (pSelect->pWhere != NULL && pCxt->pParseCxt->topicQuery == false) {
|
||||
setTableVgroupsFromEqualTbnameCond(pCxt, pSelect);
|
||||
}
|
||||
return code;
|
||||
|
@ -8135,7 +8135,9 @@ static int32_t adjustTagsForCreateTable(STranslateContext* pCxt, SCreateStreamSt
|
|||
SColumnDefNode* pDef = (SColumnDefNode*)pTagDef;
|
||||
if (!dataTypeEqual(&pDef->dataType, &((SExprNode*)pTagExpr)->resType)) {
|
||||
SNode* pFunc = NULL;
|
||||
int32_t code = createCastFunc(pCxt, pTagExpr, pDef->dataType, &pFunc);
|
||||
SDataType defType = pDef->dataType;
|
||||
defType.bytes = calcTypeBytes(defType);
|
||||
int32_t code = createCastFunc(pCxt, pTagExpr, defType, &pFunc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -940,6 +940,10 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
|||
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
|
||||
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
|
||||
|
||||
if (isHb && persistHandle && trans->pHandle == 0) {
|
||||
trans->pHandle = rpcAllocHandle();
|
||||
}
|
||||
|
||||
if (pJob && pTask) {
|
||||
SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
|
||||
epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
|
||||
|
|
|
@ -906,6 +906,7 @@ int32_t chkpMayDelObsolete(void* arg, int64_t chkpId, char* path) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
static int32_t chkpIdComp(const void* a, const void* b) {
|
||||
int64_t x = *(int64_t*)a;
|
||||
int64_t y = *(int64_t*)b;
|
||||
|
@ -964,6 +965,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
|
|||
taosMemoryFree(chkpPath);
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) {
|
||||
|
|
|
@ -29,7 +29,7 @@ class BackendEnv : public ::testing::Test {
|
|||
|
||||
void *backendCreate() {
|
||||
const char *streamPath = "/tmp";
|
||||
void *p = NULL;
|
||||
void * p = NULL;
|
||||
|
||||
// char *absPath = NULL;
|
||||
// // SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(streamPath, -1, 2);
|
||||
|
@ -52,7 +52,7 @@ SStreamState *stateCreate(const char *path) {
|
|||
}
|
||||
void *backendOpen() {
|
||||
streamMetaInit();
|
||||
const char *path = "/tmp/backend";
|
||||
const char * path = "/tmp/backend";
|
||||
SStreamState *p = stateCreate(path);
|
||||
ASSERT(p != NULL);
|
||||
|
||||
|
@ -79,7 +79,7 @@ void *backendOpen() {
|
|||
|
||||
const char *val = "value data";
|
||||
int32_t len = 0;
|
||||
char *newVal = NULL;
|
||||
char * newVal = NULL;
|
||||
streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
|
||||
ASSERT(len == strlen(val));
|
||||
}
|
||||
|
@ -100,7 +100,7 @@ void *backendOpen() {
|
|||
|
||||
const char *val = "value data";
|
||||
int32_t len = 0;
|
||||
char *newVal = NULL;
|
||||
char * newVal = NULL;
|
||||
int32_t code = streamStateGet_rocksdb(p, &key, (void **)&newVal, &len);
|
||||
ASSERT(code != 0);
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ void *backendOpen() {
|
|||
|
||||
winkey.groupId = 0;
|
||||
winkey.ts = tsArray[0];
|
||||
char *val = NULL;
|
||||
char * val = NULL;
|
||||
int32_t len = 0;
|
||||
|
||||
pCurr = streamStateSeekKeyNext_rocksdb(p, &winkey);
|
||||
|
@ -157,7 +157,7 @@ void *backendOpen() {
|
|||
key.ts = tsArray[i];
|
||||
key.exprIdx = i;
|
||||
|
||||
char *val = NULL;
|
||||
char * val = NULL;
|
||||
int32_t len = 0;
|
||||
streamStateFuncGet_rocksdb(p, &key, (void **)&val, &len);
|
||||
ASSERT(len == strlen("Value"));
|
||||
|
@ -168,7 +168,7 @@ void *backendOpen() {
|
|||
key.ts = tsArray[i];
|
||||
key.exprIdx = i;
|
||||
|
||||
char *val = NULL;
|
||||
char * val = NULL;
|
||||
int32_t len = 0;
|
||||
streamStateFuncDel_rocksdb(p, &key);
|
||||
}
|
||||
|
@ -213,7 +213,7 @@ void *backendOpen() {
|
|||
{
|
||||
SSessionKey key;
|
||||
memset(&key, 0, sizeof(key));
|
||||
char *val = NULL;
|
||||
char * val = NULL;
|
||||
int32_t vlen = 0;
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCurr, &key, (void **)&val, &vlen);
|
||||
ASSERT(code == 0);
|
||||
|
@ -260,7 +260,7 @@ void *backendOpen() {
|
|||
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
|
||||
key.groupId = (uint64_t)(i);
|
||||
key.ts = tsArray[i];
|
||||
char *val = NULL;
|
||||
char * val = NULL;
|
||||
int32_t vlen = 0;
|
||||
ASSERT(streamStateFillGet_rocksdb(p, &key, (void **)&val, &vlen) == 0);
|
||||
taosMemoryFreeClear(val);
|
||||
|
@ -272,7 +272,7 @@ void *backendOpen() {
|
|||
SStreamStateCur *pCurr = streamStateFillGetCur_rocksdb(p, &key);
|
||||
ASSERT(pCurr != NULL);
|
||||
|
||||
char *val = NULL;
|
||||
char * val = NULL;
|
||||
int32_t vlen = 0;
|
||||
ASSERT(0 == streamStateFillGetKVByCur_rocksdb(pCurr, &key, (const void **)&val, &vlen));
|
||||
ASSERT(vlen == strlen("Value"));
|
||||
|
@ -296,7 +296,7 @@ void *backendOpen() {
|
|||
SWinKey key = {0}; // {.groupId = (uint64_t)(i), .ts = tsArray[i]};
|
||||
key.groupId = (uint64_t)(i);
|
||||
key.ts = tsArray[i];
|
||||
char *val = NULL;
|
||||
char * val = NULL;
|
||||
int32_t vlen = 0;
|
||||
ASSERT(streamStateFillDel_rocksdb(p, &key) == 0);
|
||||
taosMemoryFreeClear(val);
|
||||
|
@ -338,7 +338,7 @@ void *backendOpen() {
|
|||
char key[128] = {0};
|
||||
sprintf(key, "tbname_%d", i);
|
||||
|
||||
char *val = NULL;
|
||||
char * val = NULL;
|
||||
int32_t len = 0;
|
||||
code = streamDefaultGet_rocksdb(p, key, (void **)&val, &len);
|
||||
ASSERT(code == 0);
|
||||
|
@ -354,7 +354,7 @@ TEST_F(BackendEnv, checkOpen) {
|
|||
SStreamState *p = (SStreamState *)backendOpen();
|
||||
int64_t tsStart = taosGetTimestampMs();
|
||||
{
|
||||
void *pBatch = streamStateCreateBatch();
|
||||
void * pBatch = streamStateCreateBatch();
|
||||
int32_t size = 0;
|
||||
for (int i = 0; i < size; i++) {
|
||||
char key[128] = {0};
|
||||
|
@ -368,7 +368,7 @@ TEST_F(BackendEnv, checkOpen) {
|
|||
streamStateDestroyBatch(pBatch);
|
||||
}
|
||||
{
|
||||
void *pBatch = streamStateCreateBatch();
|
||||
void * pBatch = streamStateCreateBatch();
|
||||
int32_t size = 0;
|
||||
char valBuf[256] = {0};
|
||||
for (int i = 0; i < size; i++) {
|
||||
|
@ -385,7 +385,7 @@ TEST_F(BackendEnv, checkOpen) {
|
|||
// do checkpoint 2
|
||||
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 2);
|
||||
{
|
||||
void *pBatch = streamStateCreateBatch();
|
||||
void * pBatch = streamStateCreateBatch();
|
||||
int32_t size = 0;
|
||||
char valBuf[256] = {0};
|
||||
for (int i = 0; i < size; i++) {
|
||||
|
@ -407,12 +407,22 @@ TEST_F(BackendEnv, checkOpen) {
|
|||
// taosMkDir(dump);
|
||||
taosMulMkDir(dump);
|
||||
SBkdMgt *mgt = bkdMgtCreate((char *)path);
|
||||
SArray *result = taosArrayInit(4, sizeof(void *));
|
||||
SArray * result = taosArrayInit(4, sizeof(void *));
|
||||
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 3, result, (char *)dump);
|
||||
|
||||
taskDbDoCheckpoint(p->pTdbState->pOwner->pBackend, 4);
|
||||
|
||||
taosArrayClear(result);
|
||||
bkdMgtGetDelta(mgt, p->pTdbState->idstr, 4, result, (char *)dump);
|
||||
bkdMgtDestroy(mgt);
|
||||
streamStateClose((SStreamState *)p, true);
|
||||
// {
|
||||
// taosRemoveDir("/tmp/backend");
|
||||
// const char * path = "/tmp/backend";
|
||||
// SStreamState *p = stateCreate(path);
|
||||
// }
|
||||
taosRemoveDir(path);
|
||||
// streamStateClose((SStreamState *)p, true);
|
||||
}
|
||||
|
||||
TEST_F(BackendEnv, backendChkp) { const char *path = "/tmp"; }
|
||||
|
@ -430,6 +440,20 @@ TEST_F(BackendEnv, backendUtil) {
|
|||
ASSERT_EQ(nextPow2((uint32_t)(kvDict[i].k)), kvDict[i].v);
|
||||
}
|
||||
}
|
||||
TEST_F(BackendEnv, oldBackendInit) {
|
||||
const char *path = "/tmp/backend1";
|
||||
taosMulMkDir(path);
|
||||
{
|
||||
SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(path, 10, 10);
|
||||
streamBackendCleanup((void *)p);
|
||||
}
|
||||
{
|
||||
SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(path, 10, 10);
|
||||
streamBackendCleanup((void *)p);
|
||||
}
|
||||
|
||||
taosRemoveDir(path);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
testing::InitGoogleTest(&argc, argv);
|
||||
|
|
|
@ -584,8 +584,8 @@ void* destroyConnPool(SCliThrd* pThrd) {
|
|||
|
||||
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
||||
void* pool = pThrd->pool;
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
|
||||
STrans* pTranInst = pThrd->pTransInst;
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
|
||||
if (plist == NULL) {
|
||||
SConnList list = {0};
|
||||
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
|
||||
|
@ -867,17 +867,18 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
|
|||
QUEUE_INIT(&conn->q);
|
||||
|
||||
conn->broken = true;
|
||||
if (conn->list == NULL) {
|
||||
conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr));
|
||||
}
|
||||
|
||||
if (conn->list != NULL) {
|
||||
SConnList* connList = conn->list;
|
||||
connList->list->numOfConn--;
|
||||
connList->size--;
|
||||
} else {
|
||||
if (pThrd->pool) {
|
||||
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr) + 1);
|
||||
if (connList != NULL) connList->list->numOfConn--;
|
||||
if (conn->list) {
|
||||
SConnList* list = conn->list;
|
||||
list->list->numOfConn--;
|
||||
if (conn->status == ConnInPool) {
|
||||
list->size--;
|
||||
}
|
||||
}
|
||||
|
||||
conn->list = NULL;
|
||||
pThrd->newConnCount--;
|
||||
|
||||
|
|
|
@ -159,7 +159,7 @@ static void uvStartSendResp(SSvrMsg* msg);
|
|||
|
||||
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
|
||||
|
||||
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
|
||||
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
|
||||
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
|
||||
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
||||
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
|
||||
|
@ -527,6 +527,10 @@ void uvOnSendCb(uv_write_t* req, int status) {
|
|||
if (!transQueueEmpty(&conn->srvMsgs)) {
|
||||
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
|
||||
if (msg->type == Register && conn->status == ConnAcquire) {
|
||||
if (conn->regArg.init) {
|
||||
transFreeMsg(conn->regArg.msg.pCont);
|
||||
conn->regArg.init = 0;
|
||||
}
|
||||
conn->regArg.notifyCount = 0;
|
||||
conn->regArg.init = 1;
|
||||
conn->regArg.msg = msg->msg;
|
||||
|
@ -1350,6 +1354,11 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) {
|
|||
return;
|
||||
}
|
||||
transQueuePop(&conn->srvMsgs);
|
||||
|
||||
if (conn->regArg.init) {
|
||||
transFreeMsg(conn->regArg.msg.pCont);
|
||||
conn->regArg.init = 0;
|
||||
}
|
||||
conn->regArg.notifyCount = 0;
|
||||
conn->regArg.init = 1;
|
||||
conn->regArg.msg = msg->msg;
|
||||
|
|
|
@ -21,6 +21,8 @@ print create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0
|
|||
|
||||
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0,2);
|
||||
$loop_count = 0
|
||||
|
@ -457,6 +459,8 @@ print create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0
|
|||
|
||||
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(a) c4, min(c) c5, max(id) c from t1 state_window(a);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791212000,2,2,3,1.0,1);
|
||||
sql insert into t1 values(1648791213000,1,2,3,1.0,1);
|
||||
sql insert into t1 values(1648791213000,1,2,4,1.0,2);
|
||||
|
@ -504,6 +508,9 @@ sql create table t1(ts timestamp, a int, b int , c int, d double, id int);
|
|||
print create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
||||
|
||||
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3 as select _wstart, count(*) c1, sum(b) c3 from t1 state_window(a);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 values(1648791212000,1,2,3,1.0,1);
|
||||
sql insert into t1 values(1648791213000,2,2,3,1.0,1);
|
||||
sql insert into t1 values(1648791214000,3,2,4,1.0,2);
|
||||
|
@ -557,6 +564,8 @@ print create stream if not exists streams4 trigger window_close IGNORE EXPIRED 0
|
|||
|
||||
sql create stream if not exists streams4 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart AS startts, min(c1),count(c1) from t1 state_window(c1);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into t1 (ts, c1) values (1668073288209, 11);
|
||||
sql insert into t1 (ts, c1) values (1668073288210, 11);
|
||||
sql insert into t1 (ts, c1) values (1668073288211, 11);
|
||||
|
@ -745,6 +754,9 @@ sql create table b (c timestamp, d int, e int , f int, g double);
|
|||
print create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
|
||||
|
||||
sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a);
|
||||
|
||||
sleep 1000
|
||||
|
||||
sql insert into b values(1648791213000,NULL,NULL,NULL,NULL);
|
||||
sql select * from streamt order by c1, c2, c3;
|
||||
|
||||
|
|
|
@ -366,6 +366,7 @@ class TDTestCase:
|
|||
tdSql.execute(f"insert into `test`.b1 using `test`.`b`(key) tags('1') (time, task_id) values ('2024-03-04 12:50:01.000', '32') `test`.b2 using `test`.`b`(key) tags('2') (time, task_id) values ('2024-03-04 12:50:01.000', '43') `test`.b3 using `test`.`b`(key) tags('3') (time, task_id) values ('2024-03-04 12:50:01.000', '123456')")
|
||||
|
||||
tdSql.execute(f'create topic tt as select tbname,task_id,key from b')
|
||||
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
|
@ -375,7 +376,7 @@ class TDTestCase:
|
|||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["tt"])
|
||||
consumer.subscribe(["tt"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
|
@ -395,9 +396,36 @@ class TDTestCase:
|
|||
|
||||
finally:
|
||||
consumer.close()
|
||||
|
||||
def consume_ts_4544(self):
|
||||
tdSql.execute(f'create database if not exists d1')
|
||||
tdSql.execute(f'use d1')
|
||||
tdSql.execute(f'create table stt(ts timestamp, i int) tags(t int)')
|
||||
tdSql.execute(f'insert into tt1 using stt tags(1) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into tt2 using stt tags(2) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into tt3 using stt tags(3) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into tt1 using stt tags(1) values(now+5s, 11) (now+10s, 12)')
|
||||
|
||||
tdSql.execute(f'create topic topic_in as select * from stt where tbname in ("tt2")')
|
||||
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["topic_in"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
consumer.close()
|
||||
|
||||
def run(self):
|
||||
self.consumeTest()
|
||||
self.consume_ts_4544()
|
||||
self.consume_TS_4540_Test()
|
||||
|
||||
tdSql.prepare()
|
||||
|
|
Loading…
Reference in New Issue