Skip to content

Home

Here you can read about the api of meshnetworkx.

Provides the GraphZ class for storing a NetworkX graph in Zenoh.

Usage

import meshnetworkx as mx

GraphZ

Represents a NetworkX graph stored in Zenoh.

Source code in src/meshnetworkx/__init__.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
class GraphZ:
    """Represents a NetworkX graph stored in Zenoh."""

    def __init__(self):
        """Initializes the GraphZ object and connects to the Zenoh router."""
        cfg = zenoh.Config()

        # tell zenoh to connect to local router,
        # cause multicast scouting does not work in docker outside of linux host.
        # cfg.insert_json5("connect/endpoints", json.dumps(["tcp/localhost:7447"]))
        cfg.insert_json5("connect/endpoints", json.dumps(_get_endpoints()))

        self._z = zenoh.open(cfg)

    @staticmethod
    def from_networkx(g: nx.Graph) -> "GraphZ":
        """Creates a GraphZ object from a NetworkX graph.

        Args:
            g: A NetworkX graph.

        Returns:
            A GraphZ object.
        """
        zg = GraphZ()
        for node, data in g.nodes(data=True):
            zg.add_node(node, **data)

        for u, v, data in g.edges(data=True):
            zg.add_edge(u, v, **data)

        return zg

    def to_networkx(self) -> nx.Graph:
        """Converts the GraphZ object to a NetworkX graph.

        Returns:
            A NetworkX graph.
        """
        g = nx.Graph()

        for node, data in self.nodes(data=True):
            g.add_node(node, **data)

        # TODO: add edge view
        for u, v, data in self.edges(data=True):
            g.add_edge(u, v, **data)

        return g

    def add_node(self, node: Any, **attr) -> None:
        """Adds a node to the GraphZ object.

        Args:
            node: The node to add.
            attr: Additional attributes for the node.
        """
        _try_str(node)
        # if self.has_node(node):
        #     raise MeshNetworkXError(f"Node {node} already exists")

        data_dict = {}
        data_dict.update(attr)

        data_bytes = pickle.dumps(data_dict)
        self._z.put(_totopic(node), data_bytes)
        # TODO: instead wait till we can read it back
        time.sleep(WAIT_TIME)

    # edge stuff

    def add_edge(self, u: Any, v: Any, **attr) -> None:
        """Adds an edge to the GraphZ object.

        Args:
            u: The source node.
            v: The target node.
            attr: Additional attributes for the edge.
        """
        _try_str(u)
        _try_str(v)

        # check if the nodes exist, else create them
        if not self.has_node(u):
            self.add_node(u)
        if not self.has_node(v):
            self.add_node(v)

        data_dict = {}
        data_dict.update(attr)
        data_bytes = pickle.dumps(data_dict)

        key = f"{u}/to/{v}" if u < v else f"{v}/to/{u}"
        self._z.put(_totopic(key), data_bytes)
        # TODO: instead wait till we can read it back
        time.sleep(WAIT_TIME)

    def remove_edge(self, u: Any, v: Any) -> None:
        """Removes an edge from the GraphZ object.

        Args:
            u: The source node.
            v: The target node.
        """
        _try_str(u)
        _try_str(v)

        # check if the edge exists
        if not self.has_edge(u, v) or not self.has_edge(v, u):
            raise MeshNetworkXError(f"Edge {u} to {v} does not exist")

        key = f"{u}/to/{v}" if u < v else f"{v}/to/{u}"
        self._z.delete(_totopic(key))
        time.sleep(WAIT_TIME)

    def has_edge(self, u: Any, v: Any) -> bool:
        """Checks if an edge exists in the GraphZ object.

        Args:
            u: The source node.
            v: The target node.

        Returns:
            True if the edge exists, False otherwise.
        """
        u = _try_str(u)
        v = _try_str(v)

        # sort key alphabetically
        key = (u, v) if u < v else (v, u)

        return key in self.edges()

    @property
    def edges(self) -> nx.classes.reportviews.EdgeView:
        """Returns a list of edges in the GraphZ object.

        Returns:
            A list of edges.
        """
        edges = []

        replies = self._z.get(
            _totopic("*/to/*"), handler=zenoh.handlers.DefaultHandler()
        )
        for reply in replies:
            reply: zenoh.Reply
            if not reply.ok:
                raise MeshNetworkXError(f"Error: {reply.err.payload.to_string()}")

            # the last part is the node name
            u = str(reply.ok.key_expr).split("/")[-1]
            v = str(reply.ok.key_expr).split("/")[-3]

            edge_data = pickle.loads(reply.ok.payload.to_bytes())

            edges.append((u, v, edge_data))

        G = nx.Graph()
        G.add_edges_from(edges)

        return G.edges

    @property
    def adj(self) -> dict[Any, dict[Any, dict[Any, Any]]]:
        """Returns the adjacency list of the GraphZ object.

        Returns:
            The adjacency list.
        """
        adj = {}
        replies = self._z.get(
            _totopic("*/to/*"), handler=zenoh.handlers.DefaultHandler()
        )

        for reply in replies:
            reply: zenoh.Reply

            if reply.err:
                raise MeshNetworkXError(f"Error: {reply.err.payload.to_string()}")

            if reply.ok:
                # the last part is the node name
                u = str(reply.ok.key_expr).split("/")[-1]
                v = str(reply.ok.key_expr).split("/")[-3]

                # add the edge to the adjacency list
                if u not in adj:
                    adj[u] = {}
                adj[u][v] = {}
                if v not in adj:
                    adj[v] = {}
                adj[v][u] = {}

        return adj

    def add_nodes_from(self, nodes: list[Any], **attr) -> None:
        """Add nodes from a list of nodes.

        Args:
            nodes: The nodes to add.
            **attr: The attributes to add to the nodes.
        """
        for node in nodes:
            self.add_node(node, **attr)

    def remove_nodes_from(self, nodes: list[Any]) -> None:
        """Removes nodes from the GraphZ object.

        Args:
            nodes: The nodes to remove.
        """
        for node in nodes:
            self.remove_node(node)

    def remove_node(self, node: Any) -> None:
        """Removes a node from the GraphZ object.

        Args:
            node: The node to remove.
        """
        # check if the node exists
        if not self.has_node(node):
            raise MeshNetworkXError(f"Node {node} does not exist")

        self._z.delete(_totopic(node))
        self._z.delete(_totopic(f"{node}/to/*"))
        self._z.delete(_totopic(f"*/to/{node}"))
        time.sleep(WAIT_TIME)

    def has_node(self, node: Any) -> bool:
        """Checks if a node exists in the GraphZ object.

        Args:
            node: The node to check.

        Returns:
            True if the node exists, False otherwise.
        """
        _try_str(node)
        return str(node) in self.nodes()

    # def nodes(self, data: bool = False) -> dict[Any, Any] | set[Any]:
    @property
    def nodes(self) -> NodeView:
        """Returns a list of nodes in the GraphZ object.

        Args:
            data: If True, returns a list of tuples containing nodes and their data.
            If False, returns a list of nodes.

        Returns:
            A list of nodes or a list of tuples containing nodes and their data.
        """
        nodes = {}

        replies = self._z.get(_totopic("*"), handler=zenoh.handlers.DefaultHandler())
        for reply in replies:
            reply: zenoh.Reply
            if not reply.ok:
                raise MeshNetworkXError(f"Error: {reply.err.payload.to_string()}")

            # the last part is the node name
            node = str(reply.ok.key_expr).split("/")[-1]
            node_data = pickle.loads(reply.ok.payload.to_bytes())

            nodes[node] = node_data

        return NodeView(nodes)

    def clear(self) -> None:
        """Clears all nodes from the GraphZ object."""
        self._z.delete(_totopic("**"))
        time.sleep(WAIT_TIME)

    def close(self) -> None:
        """Closes the connection to the Zenoh router."""
        self._z.close()

    def __iter__(self):
        """Returns an iterator over the nodes in the GraphZ object.

        Returns:
            An iterator over the nodes.
        """
        return iter(self.nodes())

    def draw(self, block: bool = True) -> None:
        """Draws the GraphZ object using NetworkX.

        Args:
            block: If True, blocks the drawing window. If False, does not block.
        """
        nxg = self.to_networkx()
        nx.draw(nxg)
        plt.show(block=block)

adj: dict[Any, dict[Any, dict[Any, Any]]] property

Returns the adjacency list of the GraphZ object.

Returns:

Type Description
dict[Any, dict[Any, dict[Any, Any]]]

The adjacency list.

edges: nx.classes.reportviews.EdgeView property

Returns a list of edges in the GraphZ object.

Returns:

Type Description
EdgeView

A list of edges.

nodes: NodeView property

Returns a list of nodes in the GraphZ object.

Parameters:

Name Type Description Default
data

If True, returns a list of tuples containing nodes and their data.

required

Returns:

Type Description
NodeView

A list of nodes or a list of tuples containing nodes and their data.

__init__()

Initializes the GraphZ object and connects to the Zenoh router.

Source code in src/meshnetworkx/__init__.py
71
72
73
74
75
76
77
78
79
80
def __init__(self):
    """Initializes the GraphZ object and connects to the Zenoh router."""
    cfg = zenoh.Config()

    # tell zenoh to connect to local router,
    # cause multicast scouting does not work in docker outside of linux host.
    # cfg.insert_json5("connect/endpoints", json.dumps(["tcp/localhost:7447"]))
    cfg.insert_json5("connect/endpoints", json.dumps(_get_endpoints()))

    self._z = zenoh.open(cfg)

__iter__()

Returns an iterator over the nodes in the GraphZ object.

Returns:

Type Description

An iterator over the nodes.

Source code in src/meshnetworkx/__init__.py
347
348
349
350
351
352
353
def __iter__(self):
    """Returns an iterator over the nodes in the GraphZ object.

    Returns:
        An iterator over the nodes.
    """
    return iter(self.nodes())

add_edge(u, v, **attr)

Adds an edge to the GraphZ object.

Parameters:

Name Type Description Default
u Any

The source node.

required
v Any

The target node.

required
attr

Additional attributes for the edge.

{}
Source code in src/meshnetworkx/__init__.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def add_edge(self, u: Any, v: Any, **attr) -> None:
    """Adds an edge to the GraphZ object.

    Args:
        u: The source node.
        v: The target node.
        attr: Additional attributes for the edge.
    """
    _try_str(u)
    _try_str(v)

    # check if the nodes exist, else create them
    if not self.has_node(u):
        self.add_node(u)
    if not self.has_node(v):
        self.add_node(v)

    data_dict = {}
    data_dict.update(attr)
    data_bytes = pickle.dumps(data_dict)

    key = f"{u}/to/{v}" if u < v else f"{v}/to/{u}"
    self._z.put(_totopic(key), data_bytes)
    # TODO: instead wait till we can read it back
    time.sleep(WAIT_TIME)

add_node(node, **attr)

Adds a node to the GraphZ object.

Parameters:

Name Type Description Default
node Any

The node to add.

required
attr

Additional attributes for the node.

{}
Source code in src/meshnetworkx/__init__.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def add_node(self, node: Any, **attr) -> None:
    """Adds a node to the GraphZ object.

    Args:
        node: The node to add.
        attr: Additional attributes for the node.
    """
    _try_str(node)
    # if self.has_node(node):
    #     raise MeshNetworkXError(f"Node {node} already exists")

    data_dict = {}
    data_dict.update(attr)

    data_bytes = pickle.dumps(data_dict)
    self._z.put(_totopic(node), data_bytes)
    # TODO: instead wait till we can read it back
    time.sleep(WAIT_TIME)

add_nodes_from(nodes, **attr)

Add nodes from a list of nodes.

Parameters:

Name Type Description Default
nodes list[Any]

The nodes to add.

required
**attr

The attributes to add to the nodes.

{}
Source code in src/meshnetworkx/__init__.py
264
265
266
267
268
269
270
271
272
def add_nodes_from(self, nodes: list[Any], **attr) -> None:
    """Add nodes from a list of nodes.

    Args:
        nodes: The nodes to add.
        **attr: The attributes to add to the nodes.
    """
    for node in nodes:
        self.add_node(node, **attr)

clear()

Clears all nodes from the GraphZ object.

Source code in src/meshnetworkx/__init__.py
338
339
340
341
def clear(self) -> None:
    """Clears all nodes from the GraphZ object."""
    self._z.delete(_totopic("**"))
    time.sleep(WAIT_TIME)

close()

Closes the connection to the Zenoh router.

Source code in src/meshnetworkx/__init__.py
343
344
345
def close(self) -> None:
    """Closes the connection to the Zenoh router."""
    self._z.close()

draw(block=True)

Draws the GraphZ object using NetworkX.

Parameters:

Name Type Description Default
block bool

If True, blocks the drawing window. If False, does not block.

True
Source code in src/meshnetworkx/__init__.py
355
356
357
358
359
360
361
362
363
def draw(self, block: bool = True) -> None:
    """Draws the GraphZ object using NetworkX.

    Args:
        block: If True, blocks the drawing window. If False, does not block.
    """
    nxg = self.to_networkx()
    nx.draw(nxg)
    plt.show(block=block)

from_networkx(g) staticmethod

Creates a GraphZ object from a NetworkX graph.

Parameters:

Name Type Description Default
g Graph

A NetworkX graph.

required

Returns:

Type Description
GraphZ

A GraphZ object.

Source code in src/meshnetworkx/__init__.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@staticmethod
def from_networkx(g: nx.Graph) -> "GraphZ":
    """Creates a GraphZ object from a NetworkX graph.

    Args:
        g: A NetworkX graph.

    Returns:
        A GraphZ object.
    """
    zg = GraphZ()
    for node, data in g.nodes(data=True):
        zg.add_node(node, **data)

    for u, v, data in g.edges(data=True):
        zg.add_edge(u, v, **data)

    return zg

has_edge(u, v)

Checks if an edge exists in the GraphZ object.

Parameters:

Name Type Description Default
u Any

The source node.

required
v Any

The target node.

required

Returns:

Type Description
bool

True if the edge exists, False otherwise.

Source code in src/meshnetworkx/__init__.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def has_edge(self, u: Any, v: Any) -> bool:
    """Checks if an edge exists in the GraphZ object.

    Args:
        u: The source node.
        v: The target node.

    Returns:
        True if the edge exists, False otherwise.
    """
    u = _try_str(u)
    v = _try_str(v)

    # sort key alphabetically
    key = (u, v) if u < v else (v, u)

    return key in self.edges()

has_node(node)

Checks if a node exists in the GraphZ object.

Parameters:

Name Type Description Default
node Any

The node to check.

required

Returns:

Type Description
bool

True if the node exists, False otherwise.

Source code in src/meshnetworkx/__init__.py
298
299
300
301
302
303
304
305
306
307
308
def has_node(self, node: Any) -> bool:
    """Checks if a node exists in the GraphZ object.

    Args:
        node: The node to check.

    Returns:
        True if the node exists, False otherwise.
    """
    _try_str(node)
    return str(node) in self.nodes()

remove_edge(u, v)

Removes an edge from the GraphZ object.

Parameters:

Name Type Description Default
u Any

The source node.

required
v Any

The target node.

required
Source code in src/meshnetworkx/__init__.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def remove_edge(self, u: Any, v: Any) -> None:
    """Removes an edge from the GraphZ object.

    Args:
        u: The source node.
        v: The target node.
    """
    _try_str(u)
    _try_str(v)

    # check if the edge exists
    if not self.has_edge(u, v) or not self.has_edge(v, u):
        raise MeshNetworkXError(f"Edge {u} to {v} does not exist")

    key = f"{u}/to/{v}" if u < v else f"{v}/to/{u}"
    self._z.delete(_totopic(key))
    time.sleep(WAIT_TIME)

remove_node(node)

Removes a node from the GraphZ object.

Parameters:

Name Type Description Default
node Any

The node to remove.

required
Source code in src/meshnetworkx/__init__.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def remove_node(self, node: Any) -> None:
    """Removes a node from the GraphZ object.

    Args:
        node: The node to remove.
    """
    # check if the node exists
    if not self.has_node(node):
        raise MeshNetworkXError(f"Node {node} does not exist")

    self._z.delete(_totopic(node))
    self._z.delete(_totopic(f"{node}/to/*"))
    self._z.delete(_totopic(f"*/to/{node}"))
    time.sleep(WAIT_TIME)

remove_nodes_from(nodes)

Removes nodes from the GraphZ object.

Parameters:

Name Type Description Default
nodes list[Any]

The nodes to remove.

required
Source code in src/meshnetworkx/__init__.py
274
275
276
277
278
279
280
281
def remove_nodes_from(self, nodes: list[Any]) -> None:
    """Removes nodes from the GraphZ object.

    Args:
        nodes: The nodes to remove.
    """
    for node in nodes:
        self.remove_node(node)

to_networkx()

Converts the GraphZ object to a NetworkX graph.

Returns:

Type Description
Graph

A NetworkX graph.

Source code in src/meshnetworkx/__init__.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def to_networkx(self) -> nx.Graph:
    """Converts the GraphZ object to a NetworkX graph.

    Returns:
        A NetworkX graph.
    """
    g = nx.Graph()

    for node, data in self.nodes(data=True):
        g.add_node(node, **data)

    # TODO: add edge view
    for u, v, data in self.edges(data=True):
        g.add_edge(u, v, **data)

    return g

MeshNetworkXError

Bases: Exception

General exception for MeshNetworkX errors.

Source code in src/meshnetworkx/__init__.py
22
23
24
25
class MeshNetworkXError(Exception):
    """General exception for MeshNetworkX errors."""

    pass

NodeView

Provides a read only view for accessing node data in a dictionary-like manner.

Source code in src/meshnetworkx/__init__.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class NodeView:
    """Provides a read only view for accessing node data in a dictionary-like manner."""

    def __init__(self, node_data: dict[Any, Any]):
        """Initializes the NodeView object with node data.

        Args:
            node_data: A dictionary containing node data.
        """
        self._node_data = node_data  # Dictionary to store node data

    def __getitem__(self, key: Any):
        """Allow dictionary-like access."""
        return self._node_data[key]

    def __setitem__(self, key: Any, value: Any):
        """Prevent assignment to the NodeView."""
        raise TypeError("NodeView object does not support item assignment")

    def __call__(self, data: bool = False):
        """Method-like access with optional arguments."""
        if data:
            return self._node_data.items()  # Return nodes with data
        return self._node_data.keys()  # Return just node identifiers

__call__(data=False)

Method-like access with optional arguments.

Source code in src/meshnetworkx/__init__.py
61
62
63
64
65
def __call__(self, data: bool = False):
    """Method-like access with optional arguments."""
    if data:
        return self._node_data.items()  # Return nodes with data
    return self._node_data.keys()  # Return just node identifiers

__getitem__(key)

Allow dictionary-like access.

Source code in src/meshnetworkx/__init__.py
53
54
55
def __getitem__(self, key: Any):
    """Allow dictionary-like access."""
    return self._node_data[key]

__init__(node_data)

Initializes the NodeView object with node data.

Parameters:

Name Type Description Default
node_data dict[Any, Any]

A dictionary containing node data.

required
Source code in src/meshnetworkx/__init__.py
45
46
47
48
49
50
51
def __init__(self, node_data: dict[Any, Any]):
    """Initializes the NodeView object with node data.

    Args:
        node_data: A dictionary containing node data.
    """
    self._node_data = node_data  # Dictionary to store node data

__setitem__(key, value)

Prevent assignment to the NodeView.

Source code in src/meshnetworkx/__init__.py
57
58
59
def __setitem__(self, key: Any, value: Any):
    """Prevent assignment to the NodeView."""
    raise TypeError("NodeView object does not support item assignment")