From c635e77d4e7112a3763723e83cb3eb6675c029be Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 13 Oct 2021 13:41:04 +0800 Subject: [PATCH 1/6] add vnode's dependency on wal --- source/server/vnode/inc/vnodeInt.h | 2 +- source/server/vnode/inc/vnodeMain.h | 3 ++- source/server/vnode/inc/vnodeWrite.h | 4 ++-- source/server/vnode/tq/CMakeLists.txt | 3 ++- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/source/server/vnode/inc/vnodeInt.h b/source/server/vnode/inc/vnodeInt.h index b2512b2892..70796c76bd 100644 --- a/source/server/vnode/inc/vnodeInt.h +++ b/source/server/vnode/inc/vnodeInt.h @@ -83,7 +83,7 @@ typedef struct { #if 0 SSyncCfg syncCfg; #endif - SWalCfg walCfg; + //SWalCfg walCfg; void * qMgmt; char * rootDir; tsem_t sem; diff --git a/source/server/vnode/inc/vnodeMain.h b/source/server/vnode/inc/vnodeMain.h index 093d07b013..0b41812215 100644 --- a/source/server/vnode/inc/vnodeMain.h +++ b/source/server/vnode/inc/vnodeMain.h @@ -16,10 +16,11 @@ #ifndef _TD_VNODE_MAIN_H_ #define _TD_VNODE_MAIN_H_ +#include "vnodeInt.h" + #ifdef __cplusplus extern "C" { #endif -#include "vnodeInt.h" int32_t vnodeInitMain(); void vnodeCleanupMain(); diff --git a/source/server/vnode/inc/vnodeWrite.h b/source/server/vnode/inc/vnodeWrite.h index 48acf750c1..4bbe0d88fa 100644 --- a/source/server/vnode/inc/vnodeWrite.h +++ b/source/server/vnode/inc/vnodeWrite.h @@ -27,7 +27,7 @@ taos_queue vnodeAllocWriteQueue(SVnode *pVnode); void vnodeFreeWriteQueue(taos_queue pQueue); void vnodeProcessWriteMsg(SRpcMsg *pRpcMsg); -int32_t vnodeProcessWalMsg(SVnode *pVnode, SWalHead *pHead); +//int32_t vnodeProcessWalMsg(SVnode *pVnode, SWalHead *pHead); void vnodeStartWrite(SVnode *pVnode); void vnodeStopWrite(SVnode *pVnode); @@ -37,4 +37,4 @@ void vnodeWaitWriteCompleted(SVnode *pVnode); } #endif -#endif /*_TD_VNODE_WRITE_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_WRITE_H_*/ diff --git a/source/server/vnode/tq/CMakeLists.txt b/source/server/vnode/tq/CMakeLists.txt index 9577007400..0c15e23d33 100644 --- a/source/server/vnode/tq/CMakeLists.txt +++ b/source/server/vnode/tq/CMakeLists.txt @@ -3,10 +3,11 @@ add_library(tq ${TQ_SRC}) target_include_directories( tq PUBLIC "${CMAKE_SOURCE_DIR}/include/server/vnode/tq" + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_SOURCE_DIR}/include/os" ) target_link_libraries( - os + wal ) From fd80f0b232f0ad16b04370537e38b09133e011a6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 13 Oct 2021 13:45:19 +0800 Subject: [PATCH 2/6] add wal back to vnode --- source/server/vnode/inc/vnodeInt.h | 2 +- source/server/vnode/inc/vnodeWrite.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/server/vnode/inc/vnodeInt.h b/source/server/vnode/inc/vnodeInt.h index 70796c76bd..b2512b2892 100644 --- a/source/server/vnode/inc/vnodeInt.h +++ b/source/server/vnode/inc/vnodeInt.h @@ -83,7 +83,7 @@ typedef struct { #if 0 SSyncCfg syncCfg; #endif - //SWalCfg walCfg; + SWalCfg walCfg; void * qMgmt; char * rootDir; tsem_t sem; diff --git a/source/server/vnode/inc/vnodeWrite.h b/source/server/vnode/inc/vnodeWrite.h index 4bbe0d88fa..0bb670de5b 100644 --- a/source/server/vnode/inc/vnodeWrite.h +++ b/source/server/vnode/inc/vnodeWrite.h @@ -27,7 +27,7 @@ taos_queue vnodeAllocWriteQueue(SVnode *pVnode); void vnodeFreeWriteQueue(taos_queue pQueue); void vnodeProcessWriteMsg(SRpcMsg *pRpcMsg); -//int32_t vnodeProcessWalMsg(SVnode *pVnode, SWalHead *pHead); +int32_t vnodeProcessWalMsg(SVnode *pVnode, SWalHead *pHead); void vnodeStartWrite(SVnode *pVnode); void vnodeStopWrite(SVnode *pVnode); From dd9ac9bbb90011b81f49c170da85a48843c9a5fb Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 13 Oct 2021 14:25:11 +0800 Subject: [PATCH 3/6] refine wal interface --- include/libs/wal/wal.h | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 37cd263783..b6fd5a70d9 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -58,18 +58,25 @@ void walStop(twalh); void walClose(twalh); //write -int64_t walWrite(twalh, int8_t msgType, void* body, uint32_t bodyLen); -void walFsync(twalh, bool forceHint); -//int32_t walCommit(twalh, int64_t ver); -//int32_t walRollback(twalh, int64_t ver); +int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen); +int64_t walWrite(twalh, void* body, int32_t bodyLen); +int64_t walWriteBatch(twalh, void* body, int32_t* bodyLen, int32_t batchSize); + +//apis for lifecycle management +void walFsync(twalh, bool force); +int32_t walCommit(twalh, int64_t ver); +//truncate after +int32_t walRollback(twalh, int64_t ver); +//notify that previous log can be pruned safely +int32_t walPrune(twalh, int64_t ver); //read int32_t walRead(twalh, SWalHead **, int64_t ver); int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum); -//life cycle -int32_t walDataPersisted(twalh, int64_t ver); +//lifecycle check int32_t walFirstVer(twalh); +int32_t walPersistedVer(twalh); int32_t walLastVer(twalh); //int32_t walDataCorrupted(twalh); From a1fbaf30ab6491ca27dfdd46cc109ad5bc9da113 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 13 Oct 2021 17:53:10 +0800 Subject: [PATCH 4/6] tq data structure defined --- .gitignore | 4 ++- include/server/vnode/tq/tq.h | 20 ++++++++++- source/server/vnode/tq/inc/tqInt.h | 27 +++++++++++---- source/server/vnode/tq/src/tq.c | 55 +++++++++++++++++++++++++++++- 4 files changed, 97 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 5141448ee0..0b98a1b161 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ build/ +compile_commands.json +.cache .ycm_extra_conf.py .vscode/ .idea/ @@ -96,4 +98,4 @@ tramp TAGS deps/* -!deps/CMakeLists.txt \ No newline at end of file +!deps/CMakeLists.txt diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index dd355c8381..eb9c57c581 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -22,8 +22,26 @@ extern "C" { #endif -typedef struct STQ STQ; +typedef struct tqTopicVhandle { + //name + // + //executor for filter + // + //callback for mnode + // +} tqTopic; +typedef struct STQ { + //the set for topics + //key=topicName: str + //value=tqTopicVhandle + + //a map + //key= + //value=consumeOffset: int64_t +} STQ; + +//init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); void tqCleanUp(STQ*); diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index a51f0b03af..c42bcfef43 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -18,23 +18,38 @@ #include "tq.h" +#define TQ_BUFFER_SIZE 8 + #ifdef __cplusplus extern "C" { #endif -//implement the array index -//implement the ring buffer +typedef struct tqBufferItem { + int64_t offset; + void *content; +} tqBufferItem; + + +typedef struct tqGroupHandle { + char* topic; + void* ahandle; + int64_t cgId; + int64_t consumeOffset; + int32_t head; + int32_t tail; + tqBufferItem buffer[TQ_BUFFER_SIZE]; +} tqGroupHandle; //create persistent storage for meta info such as consuming offset //return value > 0: cgId //return value <= 0: error code -int tqCreateGroup(STQ*); +int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle); //create ring buffer in memory and load consuming offset -int tqOpenGroup(STQ*, int cgId); +int tqOpenTCGroup(STQ*, const char* topic, int cgId); //destroy ring buffer and persist consuming offset -int tqCloseGroup(STQ*, int cgId); +int tqCloseTCGroup(STQ*, const char* topic, int cgId); //delete persistent storage for meta info -int tqDropGroup(STQ*, int cgId); +int tqDropTCGroup(STQ*, const char* topic, int cgId); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 3255f3fb3a..2ef2a4b6ea 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -22,12 +22,65 @@ // //handle management message +static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { + //look in memory + // + //not found, try to restore from disk + // + //still not found + return NULL; +} + +static int tqCommitTCGroup(tqGroupHandle* handle) { + //persist into disk + return 0; +} + +int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) { + return 0; +} + +int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) { + int code; + tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); + if(handle == NULL) { + code = tqCreateTCGroup(pTq, topic, cgId, &handle); + if(code != 0) { + return code; + } + } + ASSERT(handle != NULL); + + //put into STQ + + return 0; +} + +int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) { + tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); + return tqCommitTCGroup(handle); +} + +int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) { + //delete from disk + return 0; +} + int tqPushMsg(STQ* pTq , void* p, int64_t version) { //add reference - // + //judge and launch new query return 0; } int tqCommit(STQ* pTq) { + //do nothing + return 0; +} + +int tqHandleMsg(STQ* pTq, void*msg) { + //parse msg and extract topic and cgId + //lookup handle + //confirm message and send to consumer + //judge and launch new query return 0; } From 69a4417a0c5b971b8a29494847d865c66ae8e742 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 14 Oct 2021 16:50:07 +0800 Subject: [PATCH 5/6] add some msg for tq (#8241) --- include/common/taosmsg.h | 6 +++- include/server/vnode/tq/tq.h | 54 ++++++++++++++++++++++++++++-- source/server/vnode/tq/inc/tqInt.h | 7 ++-- source/server/vnode/tq/src/tq.c | 2 +- 4 files changed, 62 insertions(+), 7 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 8f89df40d0..78f91cca64 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -41,6 +41,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) @@ -113,7 +117,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) // message for topic TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) +//TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) #ifndef TAOS_MESSAGE_C diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index eb9c57c581..ef6a34ffa3 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -22,6 +22,56 @@ extern "C" { #endif +typedef struct tmqMsgHead { + int32_t headLen; + int32_t msgVer; + int64_t cgId; + int32_t topicLen; + char topic[]; +} tmqMsgHead; + +//TODO: put msgs into common +typedef struct tmqConnectReq { + tmqMsgHead head; + +} tmqConnectReq; + +typedef struct tmqConnectResp { + +} tmqConnectResp; + +typedef struct tmqDisconnectReq { + +} tmqDisconnectReq; + +typedef struct tmqDisconnectResp { + +} tmqDiconnectResp; + +typedef struct tmqConsumeReq { + +} tmqConsumeReq; + +typedef struct tmqConsumeResp { + +} tmqConsumeResp; + +typedef struct tmqSubscribeReq { + +} tmqSubscribeReq; + +typedef struct tmqSubscribeResp { + +} tmqSubscribeResp; + +typedef struct tmqHeartbeatReq { + +} tmqHeartbeatReq; + +typedef struct tmqHeartbeatResp { + +} tmqHeartbeatResp; + typedef struct tqTopicVhandle { //name // @@ -29,7 +79,7 @@ typedef struct tqTopicVhandle { // //callback for mnode // -} tqTopic; +} tqTopicVhandle; typedef struct STQ { //the set for topics @@ -50,7 +100,7 @@ int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); //void* will be replace by a msg type -int tqHandleMsg(STQ*, void* msg); +int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index c42bcfef43..cba9075fe9 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -26,14 +26,15 @@ extern "C" { typedef struct tqBufferItem { int64_t offset; - void *content; + void* executor; + void* content; } tqBufferItem; typedef struct tqGroupHandle { - char* topic; - void* ahandle; + char* topic; //c style, end with '\0' int64_t cgId; + void* ahandle; int64_t consumeOffset; int32_t head; int32_t tail; diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 2ef2a4b6ea..7733ac29b5 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -77,7 +77,7 @@ int tqCommit(STQ* pTq) { return 0; } -int tqHandleMsg(STQ* pTq, void*msg) { +int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) { //parse msg and extract topic and cgId //lookup handle //confirm message and send to consumer From ad61d062ffd36fbaddafcde8f04b24088c8d37b5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 14 Oct 2021 23:22:16 +0800 Subject: [PATCH 6/6] add priority queue --- include/util/theap.h | 65 +++++++++++++ source/util/src/theap.c | 206 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 271 insertions(+) create mode 100644 include/util/theap.h create mode 100644 source/util/src/theap.c diff --git a/include/util/theap.h b/include/util/theap.h new file mode 100644 index 0000000000..fd1a39f8dd --- /dev/null +++ b/include/util/theap.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef TDENGINE_HEAP_H +#define TDENGINE_HEAP_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" + +struct HeapNode; + +/* Return non-zero if a < b. */ +typedef int (*HeapCompareFn)(const struct HeapNode* a, const struct HeapNode* b); + +typedef struct HeapNode { + struct HeapNode* left; + struct HeapNode* right; + struct HeapNode* parent; +} HeapNode; + +/* A binary min heap. The usual properties hold: the root is the lowest + * element in the set, the height of the tree is at most log2(nodes) and + * it's always a complete binary tree. + * + */ +typedef struct { + HeapNode* min; + size_t nelts; + HeapCompareFn compFn; +} Heap; + + +Heap* heapCreate(HeapCompareFn fn); + +void heapDestroy(Heap *heap); + +HeapNode* heapMin(const Heap* heap); + +void heapInsert(Heap* heap, HeapNode* node); + +void heapRemove(Heap* heap, struct HeapNode* node); + +void heapDequeue(Heap* heap); + +size_t heapSize(Heap *heap); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_HASH_H diff --git a/source/util/src/theap.c b/source/util/src/theap.c new file mode 100644 index 0000000000..aa822c7d5e --- /dev/null +++ b/source/util/src/theap.c @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "theap.h" + +size_t heapSize(Heap* heap) { + return heap->nelts; +} + +Heap* heapCreate(HeapCompareFn fn) { + Heap *heap = calloc(1, sizeof(Heap)); + if (heap == NULL) { return NULL; } + + heap->min = NULL; + heap->nelts = 0; + heap->compFn = fn; + return heap; +} + +void heapDestroy(Heap *heap) { + free(heap); +} + +HeapNode* heapMin(const Heap* heap) { + return heap->min; +} + +/* Swap parent with child. Child moves closer to the root, parent moves away. */ +static void heapNodeSwap(Heap* heap, HeapNode* parent, HeapNode* child) { + HeapNode* sibling; + HeapNode t; + + t = *parent; + *parent = *child; + *child = t; + + parent->parent = child; + if (child->left == child) { + child->left = parent; + sibling = child->right; + } else { + child->right = parent; + sibling = child->left; + } + if (sibling != NULL) + sibling->parent = child; + + if (parent->left != NULL) + parent->left->parent = parent; + if (parent->right != NULL) + parent->right->parent = parent; + + if (child->parent == NULL) + heap->min = child; + else if (child->parent->left == parent) + child->parent->left = child; + else + child->parent->right = child; +} + +void heapInsert(Heap* heap, HeapNode* newnode) { + HeapNode** parent; + HeapNode** child; + unsigned int path; + unsigned int n; + unsigned int k; + + newnode->left = NULL; + newnode->right = NULL; + newnode->parent = NULL; + + /* Calculate the path from the root to the insertion point. This is a min + * heap so we always insert at the left-most free node of the bottom row. + */ + path = 0; + for (k = 0, n = 1 + heap->nelts; n >= 2; k += 1, n /= 2) + path = (path << 1) | (n & 1); + + /* Now traverse the heap using the path we calculated in the previous step. */ + parent = child = &heap->min; + while (k > 0) { + parent = child; + if (path & 1) + child = &(*child)->right; + else + child = &(*child)->left; + path >>= 1; + k -= 1; + } + + /* Insert the new node. */ + newnode->parent = *parent; + *child = newnode; + heap->nelts += 1; + + /* Walk up the tree and check at each node if the heap property holds. + * It's a min heap so parent < child must be true. + */ + while (newnode->parent != NULL && (heap->compFn)(newnode, newnode->parent)) + heapNodeSwap(heap, newnode->parent, newnode); +} + +void heapRemove(Heap* heap, HeapNode* node) { + HeapNode* smallest; + HeapNode** max; + HeapNode* child; + unsigned int path; + unsigned int k; + unsigned int n; + + if (heap->nelts == 0) + return; + + /* Calculate the path from the min (the root) to the max, the left-most node + * of the bottom row. + */ + path = 0; + for (k = 0, n = heap->nelts; n >= 2; k += 1, n /= 2) + path = (path << 1) | (n & 1); + + /* Now traverse the heap using the path we calculated in the previous step. */ + max = &heap->min; + while (k > 0) { + if (path & 1) + max = &(*max)->right; + else + max = &(*max)->left; + path >>= 1; + k -= 1; + } + + heap->nelts -= 1; + + /* Unlink the max node. */ + child = *max; + *max = NULL; + + if (child == node) { + /* We're removing either the max or the last node in the tree. */ + if (child == heap->min) { + heap->min = NULL; + } + return; + } + + /* Replace the to be deleted node with the max node. */ + child->left = node->left; + child->right = node->right; + child->parent = node->parent; + + if (child->left != NULL) { + child->left->parent = child; + } + + if (child->right != NULL) { + child->right->parent = child; + } + + if (node->parent == NULL) { + heap->min = child; + } else if (node->parent->left == node) { + node->parent->left = child; + } else { + node->parent->right = child; + } + + /* Walk down the subtree and check at each node if the heap property holds. + * It's a min heap so parent < child must be true. If the parent is bigger, + * swap it with the smallest child. + */ + for (;;) { + smallest = child; + if (child->left != NULL && (heap->compFn)(child->left, smallest)) + smallest = child->left; + if (child->right != NULL && (heap->compFn)(child->right, smallest)) + smallest = child->right; + if (smallest == child) + break; + heapNodeSwap(heap, child, smallest); + } + + /* Walk up the subtree and check that each parent is less than the node + * this is required, because `max` node is not guaranteed to be the + * actual maximum in tree + */ + while (child->parent != NULL && (heap->compFn)(child, child->parent)) + heapNodeSwap(heap, child->parent, child); +} + +void heapDequeue(Heap* heap) { + heapRemove(heap, heap->min); +} + +