Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
5db5e7c38d
|
@ -1,4 +1,6 @@
|
||||||
build/
|
build/
|
||||||
|
compile_commands.json
|
||||||
|
.cache
|
||||||
.ycm_extra_conf.py
|
.ycm_extra_conf.py
|
||||||
.vscode/
|
.vscode/
|
||||||
.idea/
|
.idea/
|
||||||
|
|
|
@ -41,6 +41,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
|
||||||
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
|
||||||
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
|
||||||
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" )
|
||||||
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
|
||||||
|
@ -113,7 +117,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
|
||||||
// message for topic
|
// message for topic
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
|
//TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
|
||||||
|
|
||||||
#ifndef TAOS_MESSAGE_C
|
#ifndef TAOS_MESSAGE_C
|
||||||
|
|
|
@ -58,18 +58,25 @@ void walStop(twalh);
|
||||||
void walClose(twalh);
|
void walClose(twalh);
|
||||||
|
|
||||||
//write
|
//write
|
||||||
int64_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen);
|
int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen);
|
||||||
void walFsync(twalh, bool forceHint);
|
int64_t walWrite(twalh, void* body, int32_t bodyLen);
|
||||||
//int32_t walCommit(twalh, int64_t ver);
|
int64_t walWriteBatch(twalh, void* body, int32_t* bodyLen, int32_t batchSize);
|
||||||
//int32_t walRollback(twalh, int64_t ver);
|
|
||||||
|
//apis for lifecycle management
|
||||||
|
void walFsync(twalh, bool force);
|
||||||
|
int32_t walCommit(twalh, int64_t ver);
|
||||||
|
//truncate after
|
||||||
|
int32_t walRollback(twalh, int64_t ver);
|
||||||
|
//notify that previous log can be pruned safely
|
||||||
|
int32_t walPrune(twalh, int64_t ver);
|
||||||
|
|
||||||
//read
|
//read
|
||||||
int32_t walRead(twalh, SWalHead **, int64_t ver);
|
int32_t walRead(twalh, SWalHead **, int64_t ver);
|
||||||
int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum);
|
int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum);
|
||||||
|
|
||||||
//life cycle
|
//lifecycle check
|
||||||
int32_t walDataPersisted(twalh, int64_t ver);
|
|
||||||
int32_t walFirstVer(twalh);
|
int32_t walFirstVer(twalh);
|
||||||
|
int32_t walPersistedVer(twalh);
|
||||||
int32_t walLastVer(twalh);
|
int32_t walLastVer(twalh);
|
||||||
//int32_t walDataCorrupted(twalh);
|
//int32_t walDataCorrupted(twalh);
|
||||||
|
|
||||||
|
|
|
@ -22,8 +22,76 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct STQ STQ;
|
typedef struct tmqMsgHead {
|
||||||
|
int32_t headLen;
|
||||||
|
int32_t msgVer;
|
||||||
|
int64_t cgId;
|
||||||
|
int32_t topicLen;
|
||||||
|
char topic[];
|
||||||
|
} tmqMsgHead;
|
||||||
|
|
||||||
|
//TODO: put msgs into common
|
||||||
|
typedef struct tmqConnectReq {
|
||||||
|
tmqMsgHead head;
|
||||||
|
|
||||||
|
} tmqConnectReq;
|
||||||
|
|
||||||
|
typedef struct tmqConnectResp {
|
||||||
|
|
||||||
|
} tmqConnectResp;
|
||||||
|
|
||||||
|
typedef struct tmqDisconnectReq {
|
||||||
|
|
||||||
|
} tmqDisconnectReq;
|
||||||
|
|
||||||
|
typedef struct tmqDisconnectResp {
|
||||||
|
|
||||||
|
} tmqDiconnectResp;
|
||||||
|
|
||||||
|
typedef struct tmqConsumeReq {
|
||||||
|
|
||||||
|
} tmqConsumeReq;
|
||||||
|
|
||||||
|
typedef struct tmqConsumeResp {
|
||||||
|
|
||||||
|
} tmqConsumeResp;
|
||||||
|
|
||||||
|
typedef struct tmqSubscribeReq {
|
||||||
|
|
||||||
|
} tmqSubscribeReq;
|
||||||
|
|
||||||
|
typedef struct tmqSubscribeResp {
|
||||||
|
|
||||||
|
} tmqSubscribeResp;
|
||||||
|
|
||||||
|
typedef struct tmqHeartbeatReq {
|
||||||
|
|
||||||
|
} tmqHeartbeatReq;
|
||||||
|
|
||||||
|
typedef struct tmqHeartbeatResp {
|
||||||
|
|
||||||
|
} tmqHeartbeatResp;
|
||||||
|
|
||||||
|
typedef struct tqTopicVhandle {
|
||||||
|
//name
|
||||||
|
//
|
||||||
|
//executor for filter
|
||||||
|
//
|
||||||
|
//callback for mnode
|
||||||
|
//
|
||||||
|
} tqTopicVhandle;
|
||||||
|
|
||||||
|
typedef struct STQ {
|
||||||
|
//the set for topics
|
||||||
|
//key=topicName: str
|
||||||
|
//value=tqTopicVhandle
|
||||||
|
|
||||||
|
//a map
|
||||||
|
//key=<topic: str, cgId: int64_t>
|
||||||
|
//value=consumeOffset: int64_t
|
||||||
|
} STQ;
|
||||||
|
|
||||||
|
//init in each vnode
|
||||||
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
|
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
|
||||||
void tqCleanUp(STQ*);
|
void tqCleanUp(STQ*);
|
||||||
|
|
||||||
|
@ -32,7 +100,7 @@ int tqPushMsg(STQ*, void* msg, int64_t version);
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
|
|
||||||
//void* will be replace by a msg type
|
//void* will be replace by a msg type
|
||||||
int tqHandleMsg(STQ*, void* msg);
|
int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#ifndef TDENGINE_HEAP_H
|
||||||
|
#define TDENGINE_HEAP_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
|
||||||
|
struct HeapNode;
|
||||||
|
|
||||||
|
/* Return non-zero if a < b. */
|
||||||
|
typedef int (*HeapCompareFn)(const struct HeapNode* a, const struct HeapNode* b);
|
||||||
|
|
||||||
|
typedef struct HeapNode {
|
||||||
|
struct HeapNode* left;
|
||||||
|
struct HeapNode* right;
|
||||||
|
struct HeapNode* parent;
|
||||||
|
} HeapNode;
|
||||||
|
|
||||||
|
/* A binary min heap. The usual properties hold: the root is the lowest
|
||||||
|
* element in the set, the height of the tree is at most log2(nodes) and
|
||||||
|
* it's always a complete binary tree.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
typedef struct {
|
||||||
|
HeapNode* min;
|
||||||
|
size_t nelts;
|
||||||
|
HeapCompareFn compFn;
|
||||||
|
} Heap;
|
||||||
|
|
||||||
|
|
||||||
|
Heap* heapCreate(HeapCompareFn fn);
|
||||||
|
|
||||||
|
void heapDestroy(Heap *heap);
|
||||||
|
|
||||||
|
HeapNode* heapMin(const Heap* heap);
|
||||||
|
|
||||||
|
void heapInsert(Heap* heap, HeapNode* node);
|
||||||
|
|
||||||
|
void heapRemove(Heap* heap, struct HeapNode* node);
|
||||||
|
|
||||||
|
void heapDequeue(Heap* heap);
|
||||||
|
|
||||||
|
size_t heapSize(Heap *heap);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif // TDENGINE_HASH_H
|
|
@ -16,10 +16,11 @@
|
||||||
#ifndef _TD_VNODE_MAIN_H_
|
#ifndef _TD_VNODE_MAIN_H_
|
||||||
#define _TD_VNODE_MAIN_H_
|
#define _TD_VNODE_MAIN_H_
|
||||||
|
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
#include "vnodeInt.h"
|
|
||||||
|
|
||||||
int32_t vnodeInitMain();
|
int32_t vnodeInitMain();
|
||||||
void vnodeCleanupMain();
|
void vnodeCleanupMain();
|
||||||
|
|
|
@ -3,10 +3,11 @@ add_library(tq ${TQ_SRC})
|
||||||
target_include_directories(
|
target_include_directories(
|
||||||
tq
|
tq
|
||||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
|
PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq"
|
||||||
|
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal"
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
PRIVATE "${CMAKE_SOURCE_DIR}/include/os"
|
PRIVATE "${CMAKE_SOURCE_DIR}/include/os"
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
os
|
wal
|
||||||
)
|
)
|
||||||
|
|
|
@ -18,23 +18,39 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
|
#define TQ_BUFFER_SIZE 8
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
//implement the array index
|
typedef struct tqBufferItem {
|
||||||
//implement the ring buffer
|
int64_t offset;
|
||||||
|
void* executor;
|
||||||
|
void* content;
|
||||||
|
} tqBufferItem;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct tqGroupHandle {
|
||||||
|
char* topic; //c style, end with '\0'
|
||||||
|
int64_t cgId;
|
||||||
|
void* ahandle;
|
||||||
|
int64_t consumeOffset;
|
||||||
|
int32_t head;
|
||||||
|
int32_t tail;
|
||||||
|
tqBufferItem buffer[TQ_BUFFER_SIZE];
|
||||||
|
} tqGroupHandle;
|
||||||
|
|
||||||
//create persistent storage for meta info such as consuming offset
|
//create persistent storage for meta info such as consuming offset
|
||||||
//return value > 0: cgId
|
//return value > 0: cgId
|
||||||
//return value <= 0: error code
|
//return value <= 0: error code
|
||||||
int tqCreateGroup(STQ*);
|
int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle);
|
||||||
//create ring buffer in memory and load consuming offset
|
//create ring buffer in memory and load consuming offset
|
||||||
int tqOpenGroup(STQ*, int cgId);
|
int tqOpenTCGroup(STQ*, const char* topic, int cgId);
|
||||||
//destroy ring buffer and persist consuming offset
|
//destroy ring buffer and persist consuming offset
|
||||||
int tqCloseGroup(STQ*, int cgId);
|
int tqCloseTCGroup(STQ*, const char* topic, int cgId);
|
||||||
//delete persistent storage for meta info
|
//delete persistent storage for meta info
|
||||||
int tqDropGroup(STQ*, int cgId);
|
int tqDropTCGroup(STQ*, const char* topic, int cgId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,12 +22,65 @@
|
||||||
//
|
//
|
||||||
//handle management message
|
//handle management message
|
||||||
|
|
||||||
|
static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) {
|
||||||
|
//look in memory
|
||||||
|
//
|
||||||
|
//not found, try to restore from disk
|
||||||
|
//
|
||||||
|
//still not found
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tqCommitTCGroup(tqGroupHandle* handle) {
|
||||||
|
//persist into disk
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) {
|
||||||
|
int code;
|
||||||
|
tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);
|
||||||
|
if(handle == NULL) {
|
||||||
|
code = tqCreateTCGroup(pTq, topic, cgId, &handle);
|
||||||
|
if(code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT(handle != NULL);
|
||||||
|
|
||||||
|
//put into STQ
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {
|
||||||
|
tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);
|
||||||
|
return tqCommitTCGroup(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) {
|
||||||
|
//delete from disk
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int tqPushMsg(STQ* pTq , void* p, int64_t version) {
|
int tqPushMsg(STQ* pTq , void* p, int64_t version) {
|
||||||
//add reference
|
//add reference
|
||||||
//
|
//judge and launch new query
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqCommit(STQ* pTq) {
|
int tqCommit(STQ* pTq) {
|
||||||
|
//do nothing
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) {
|
||||||
|
//parse msg and extract topic and cgId
|
||||||
|
//lookup handle
|
||||||
|
//confirm message and send to consumer
|
||||||
|
//judge and launch new query
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,206 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "theap.h"
|
||||||
|
|
||||||
|
size_t heapSize(Heap* heap) {
|
||||||
|
return heap->nelts;
|
||||||
|
}
|
||||||
|
|
||||||
|
Heap* heapCreate(HeapCompareFn fn) {
|
||||||
|
Heap *heap = calloc(1, sizeof(Heap));
|
||||||
|
if (heap == NULL) { return NULL; }
|
||||||
|
|
||||||
|
heap->min = NULL;
|
||||||
|
heap->nelts = 0;
|
||||||
|
heap->compFn = fn;
|
||||||
|
return heap;
|
||||||
|
}
|
||||||
|
|
||||||
|
void heapDestroy(Heap *heap) {
|
||||||
|
free(heap);
|
||||||
|
}
|
||||||
|
|
||||||
|
HeapNode* heapMin(const Heap* heap) {
|
||||||
|
return heap->min;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Swap parent with child. Child moves closer to the root, parent moves away. */
|
||||||
|
static void heapNodeSwap(Heap* heap, HeapNode* parent, HeapNode* child) {
|
||||||
|
HeapNode* sibling;
|
||||||
|
HeapNode t;
|
||||||
|
|
||||||
|
t = *parent;
|
||||||
|
*parent = *child;
|
||||||
|
*child = t;
|
||||||
|
|
||||||
|
parent->parent = child;
|
||||||
|
if (child->left == child) {
|
||||||
|
child->left = parent;
|
||||||
|
sibling = child->right;
|
||||||
|
} else {
|
||||||
|
child->right = parent;
|
||||||
|
sibling = child->left;
|
||||||
|
}
|
||||||
|
if (sibling != NULL)
|
||||||
|
sibling->parent = child;
|
||||||
|
|
||||||
|
if (parent->left != NULL)
|
||||||
|
parent->left->parent = parent;
|
||||||
|
if (parent->right != NULL)
|
||||||
|
parent->right->parent = parent;
|
||||||
|
|
||||||
|
if (child->parent == NULL)
|
||||||
|
heap->min = child;
|
||||||
|
else if (child->parent->left == parent)
|
||||||
|
child->parent->left = child;
|
||||||
|
else
|
||||||
|
child->parent->right = child;
|
||||||
|
}
|
||||||
|
|
||||||
|
void heapInsert(Heap* heap, HeapNode* newnode) {
|
||||||
|
HeapNode** parent;
|
||||||
|
HeapNode** child;
|
||||||
|
unsigned int path;
|
||||||
|
unsigned int n;
|
||||||
|
unsigned int k;
|
||||||
|
|
||||||
|
newnode->left = NULL;
|
||||||
|
newnode->right = NULL;
|
||||||
|
newnode->parent = NULL;
|
||||||
|
|
||||||
|
/* Calculate the path from the root to the insertion point. This is a min
|
||||||
|
* heap so we always insert at the left-most free node of the bottom row.
|
||||||
|
*/
|
||||||
|
path = 0;
|
||||||
|
for (k = 0, n = 1 + heap->nelts; n >= 2; k += 1, n /= 2)
|
||||||
|
path = (path << 1) | (n & 1);
|
||||||
|
|
||||||
|
/* Now traverse the heap using the path we calculated in the previous step. */
|
||||||
|
parent = child = &heap->min;
|
||||||
|
while (k > 0) {
|
||||||
|
parent = child;
|
||||||
|
if (path & 1)
|
||||||
|
child = &(*child)->right;
|
||||||
|
else
|
||||||
|
child = &(*child)->left;
|
||||||
|
path >>= 1;
|
||||||
|
k -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Insert the new node. */
|
||||||
|
newnode->parent = *parent;
|
||||||
|
*child = newnode;
|
||||||
|
heap->nelts += 1;
|
||||||
|
|
||||||
|
/* Walk up the tree and check at each node if the heap property holds.
|
||||||
|
* It's a min heap so parent < child must be true.
|
||||||
|
*/
|
||||||
|
while (newnode->parent != NULL && (heap->compFn)(newnode, newnode->parent))
|
||||||
|
heapNodeSwap(heap, newnode->parent, newnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void heapRemove(Heap* heap, HeapNode* node) {
|
||||||
|
HeapNode* smallest;
|
||||||
|
HeapNode** max;
|
||||||
|
HeapNode* child;
|
||||||
|
unsigned int path;
|
||||||
|
unsigned int k;
|
||||||
|
unsigned int n;
|
||||||
|
|
||||||
|
if (heap->nelts == 0)
|
||||||
|
return;
|
||||||
|
|
||||||
|
/* Calculate the path from the min (the root) to the max, the left-most node
|
||||||
|
* of the bottom row.
|
||||||
|
*/
|
||||||
|
path = 0;
|
||||||
|
for (k = 0, n = heap->nelts; n >= 2; k += 1, n /= 2)
|
||||||
|
path = (path << 1) | (n & 1);
|
||||||
|
|
||||||
|
/* Now traverse the heap using the path we calculated in the previous step. */
|
||||||
|
max = &heap->min;
|
||||||
|
while (k > 0) {
|
||||||
|
if (path & 1)
|
||||||
|
max = &(*max)->right;
|
||||||
|
else
|
||||||
|
max = &(*max)->left;
|
||||||
|
path >>= 1;
|
||||||
|
k -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
heap->nelts -= 1;
|
||||||
|
|
||||||
|
/* Unlink the max node. */
|
||||||
|
child = *max;
|
||||||
|
*max = NULL;
|
||||||
|
|
||||||
|
if (child == node) {
|
||||||
|
/* We're removing either the max or the last node in the tree. */
|
||||||
|
if (child == heap->min) {
|
||||||
|
heap->min = NULL;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Replace the to be deleted node with the max node. */
|
||||||
|
child->left = node->left;
|
||||||
|
child->right = node->right;
|
||||||
|
child->parent = node->parent;
|
||||||
|
|
||||||
|
if (child->left != NULL) {
|
||||||
|
child->left->parent = child;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (child->right != NULL) {
|
||||||
|
child->right->parent = child;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (node->parent == NULL) {
|
||||||
|
heap->min = child;
|
||||||
|
} else if (node->parent->left == node) {
|
||||||
|
node->parent->left = child;
|
||||||
|
} else {
|
||||||
|
node->parent->right = child;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Walk down the subtree and check at each node if the heap property holds.
|
||||||
|
* It's a min heap so parent < child must be true. If the parent is bigger,
|
||||||
|
* swap it with the smallest child.
|
||||||
|
*/
|
||||||
|
for (;;) {
|
||||||
|
smallest = child;
|
||||||
|
if (child->left != NULL && (heap->compFn)(child->left, smallest))
|
||||||
|
smallest = child->left;
|
||||||
|
if (child->right != NULL && (heap->compFn)(child->right, smallest))
|
||||||
|
smallest = child->right;
|
||||||
|
if (smallest == child)
|
||||||
|
break;
|
||||||
|
heapNodeSwap(heap, child, smallest);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Walk up the subtree and check that each parent is less than the node
|
||||||
|
* this is required, because `max` node is not guaranteed to be the
|
||||||
|
* actual maximum in tree
|
||||||
|
*/
|
||||||
|
while (child->parent != NULL && (heap->compFn)(child, child->parent))
|
||||||
|
heapNodeSwap(heap, child->parent, child);
|
||||||
|
}
|
||||||
|
|
||||||
|
void heapDequeue(Heap* heap) {
|
||||||
|
heapRemove(heap, heap->min);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue