| 1 | /* |
| 2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
| 3 | |
| 4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
| 5 | |
| 6 | libzmq is free software; you can redistribute it and/or modify it under |
| 7 | the terms of the GNU Lesser General Public License (LGPL) as published |
| 8 | by the Free Software Foundation; either version 3 of the License, or |
| 9 | (at your option) any later version. |
| 10 | |
| 11 | As a special exception, the Contributors give you permission to link |
| 12 | this library with independent modules to produce an executable, |
| 13 | regardless of the license terms of these independent modules, and to |
| 14 | copy and distribute the resulting executable under terms of your choice, |
| 15 | provided that you also meet, for each linked independent module, the |
| 16 | terms and conditions of the license of that module. An independent |
| 17 | module is a module which is not derived from or based on this library. |
| 18 | If you modify this library, you must extend this exception to your |
| 19 | version of the library. |
| 20 | |
| 21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
| 22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
| 24 | License for more details. |
| 25 | |
| 26 | You should have received a copy of the GNU Lesser General Public License |
| 27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 28 | */ |
| 29 | |
| 30 | #include "precompiled.hpp" |
| 31 | #include <new> |
| 32 | #include <stddef.h> |
| 33 | |
| 34 | #include "macros.hpp" |
| 35 | #include "pipe.hpp" |
| 36 | #include "err.hpp" |
| 37 | |
| 38 | #include "ypipe.hpp" |
| 39 | #include "ypipe_conflate.hpp" |
| 40 | |
| 41 | int zmq::pipepair (class object_t *parents_[2], |
| 42 | class pipe_t *pipes_[2], |
| 43 | int hwms_[2], |
| 44 | bool conflate_[2]) |
| 45 | { |
| 46 | // Creates two pipe objects. These objects are connected by two ypipes, |
| 47 | // each to pass messages in one direction. |
| 48 | |
| 49 | typedef ypipe_t<msg_t, message_pipe_granularity> upipe_normal_t; |
| 50 | typedef ypipe_conflate_t<msg_t> upipe_conflate_t; |
| 51 | |
| 52 | pipe_t::upipe_t *upipe1; |
| 53 | if (conflate_[0]) |
| 54 | upipe1 = new (std::nothrow) upipe_conflate_t (); |
| 55 | else |
| 56 | upipe1 = new (std::nothrow) upipe_normal_t (); |
| 57 | alloc_assert (upipe1); |
| 58 | |
| 59 | pipe_t::upipe_t *upipe2; |
| 60 | if (conflate_[1]) |
| 61 | upipe2 = new (std::nothrow) upipe_conflate_t (); |
| 62 | else |
| 63 | upipe2 = new (std::nothrow) upipe_normal_t (); |
| 64 | alloc_assert (upipe2); |
| 65 | |
| 66 | pipes_[0] = new (std::nothrow) |
| 67 | pipe_t (parents_[0], upipe1, upipe2, hwms_[1], hwms_[0], conflate_[0]); |
| 68 | alloc_assert (pipes_[0]); |
| 69 | pipes_[1] = new (std::nothrow) |
| 70 | pipe_t (parents_[1], upipe2, upipe1, hwms_[0], hwms_[1], conflate_[1]); |
| 71 | alloc_assert (pipes_[1]); |
| 72 | |
| 73 | pipes_[0]->set_peer (pipes_[1]); |
| 74 | pipes_[1]->set_peer (pipes_[0]); |
| 75 | |
| 76 | return 0; |
| 77 | } |
| 78 | |
| 79 | void zmq::send_routing_id (pipe_t *pipe_, const options_t &options_) |
| 80 | { |
| 81 | zmq::msg_t id; |
| 82 | const int rc = id.init_size (options_.routing_id_size); |
| 83 | errno_assert (rc == 0); |
| 84 | memcpy (id.data (), options_.routing_id, options_.routing_id_size); |
| 85 | id.set_flags (zmq::msg_t::routing_id); |
| 86 | const bool written = pipe_->write (&id); |
| 87 | zmq_assert (written); |
| 88 | pipe_->flush (); |
| 89 | } |
| 90 | |
| 91 | zmq::pipe_t::pipe_t (object_t *parent_, |
| 92 | upipe_t *inpipe_, |
| 93 | upipe_t *outpipe_, |
| 94 | int inhwm_, |
| 95 | int outhwm_, |
| 96 | bool conflate_) : |
| 97 | object_t (parent_), |
| 98 | _in_pipe (inpipe_), |
| 99 | _out_pipe (outpipe_), |
| 100 | _in_active (true), |
| 101 | _out_active (true), |
| 102 | _hwm (outhwm_), |
| 103 | _lwm (compute_lwm (inhwm_)), |
| 104 | _in_hwm_boost (-1), |
| 105 | _out_hwm_boost (-1), |
| 106 | _msgs_read (0), |
| 107 | _msgs_written (0), |
| 108 | _peers_msgs_read (0), |
| 109 | _peer (NULL), |
| 110 | _sink (NULL), |
| 111 | _state (active), |
| 112 | _delay (true), |
| 113 | _server_socket_routing_id (0), |
| 114 | _conflate (conflate_) |
| 115 | { |
| 116 | } |
| 117 | |
| 118 | zmq::pipe_t::~pipe_t () |
| 119 | { |
| 120 | } |
| 121 | |
| 122 | void zmq::pipe_t::set_peer (pipe_t *peer_) |
| 123 | { |
| 124 | // Peer can be set once only. |
| 125 | zmq_assert (!_peer); |
| 126 | _peer = peer_; |
| 127 | } |
| 128 | |
| 129 | void zmq::pipe_t::set_event_sink (i_pipe_events *sink_) |
| 130 | { |
| 131 | // Sink can be set once only. |
| 132 | zmq_assert (!_sink); |
| 133 | _sink = sink_; |
| 134 | } |
| 135 | |
| 136 | void zmq::pipe_t::set_server_socket_routing_id ( |
| 137 | uint32_t server_socket_routing_id_) |
| 138 | { |
| 139 | _server_socket_routing_id = server_socket_routing_id_; |
| 140 | } |
| 141 | |
| 142 | uint32_t zmq::pipe_t::get_server_socket_routing_id () const |
| 143 | { |
| 144 | return _server_socket_routing_id; |
| 145 | } |
| 146 | |
| 147 | void zmq::pipe_t::set_router_socket_routing_id ( |
| 148 | const blob_t &router_socket_routing_id_) |
| 149 | { |
| 150 | _router_socket_routing_id.set_deep_copy (router_socket_routing_id_); |
| 151 | } |
| 152 | |
| 153 | const zmq::blob_t &zmq::pipe_t::get_routing_id () const |
| 154 | { |
| 155 | return _router_socket_routing_id; |
| 156 | } |
| 157 | |
| 158 | bool zmq::pipe_t::check_read () |
| 159 | { |
| 160 | if (unlikely (!_in_active)) |
| 161 | return false; |
| 162 | if (unlikely (_state != active && _state != waiting_for_delimiter)) |
| 163 | return false; |
| 164 | |
| 165 | // Check if there's an item in the pipe. |
| 166 | if (!_in_pipe->check_read ()) { |
| 167 | _in_active = false; |
| 168 | return false; |
| 169 | } |
| 170 | |
| 171 | // If the next item in the pipe is message delimiter, |
| 172 | // initiate termination process. |
| 173 | if (_in_pipe->probe (is_delimiter)) { |
| 174 | msg_t msg; |
| 175 | const bool ok = _in_pipe->read (&msg); |
| 176 | zmq_assert (ok); |
| 177 | process_delimiter (); |
| 178 | return false; |
| 179 | } |
| 180 | |
| 181 | return true; |
| 182 | } |
| 183 | |
| 184 | bool zmq::pipe_t::read (msg_t *msg_) |
| 185 | { |
| 186 | if (unlikely (!_in_active)) |
| 187 | return false; |
| 188 | if (unlikely (_state != active && _state != waiting_for_delimiter)) |
| 189 | return false; |
| 190 | |
| 191 | for (bool payload_read = false; !payload_read;) { |
| 192 | if (!_in_pipe->read (msg_)) { |
| 193 | _in_active = false; |
| 194 | return false; |
| 195 | } |
| 196 | |
| 197 | // If this is a credential, ignore it and receive next message. |
| 198 | if (unlikely (msg_->is_credential ())) { |
| 199 | const int rc = msg_->close (); |
| 200 | zmq_assert (rc == 0); |
| 201 | } else |
| 202 | payload_read = true; |
| 203 | } |
| 204 | |
| 205 | // If delimiter was read, start termination process of the pipe. |
| 206 | if (msg_->is_delimiter ()) { |
| 207 | process_delimiter (); |
| 208 | return false; |
| 209 | } |
| 210 | |
| 211 | if (!(msg_->flags () & msg_t::more) && !msg_->is_routing_id ()) |
| 212 | _msgs_read++; |
| 213 | |
| 214 | if (_lwm > 0 && _msgs_read % _lwm == 0) |
| 215 | send_activate_write (_peer, _msgs_read); |
| 216 | |
| 217 | return true; |
| 218 | } |
| 219 | |
| 220 | bool zmq::pipe_t::check_write () |
| 221 | { |
| 222 | if (unlikely (!_out_active || _state != active)) |
| 223 | return false; |
| 224 | |
| 225 | const bool full = !check_hwm (); |
| 226 | |
| 227 | if (unlikely (full)) { |
| 228 | _out_active = false; |
| 229 | return false; |
| 230 | } |
| 231 | |
| 232 | return true; |
| 233 | } |
| 234 | |
| 235 | bool zmq::pipe_t::write (msg_t *msg_) |
| 236 | { |
| 237 | if (unlikely (!check_write ())) |
| 238 | return false; |
| 239 | |
| 240 | const bool more = (msg_->flags () & msg_t::more) != 0; |
| 241 | const bool is_routing_id = msg_->is_routing_id (); |
| 242 | _out_pipe->write (*msg_, more); |
| 243 | if (!more && !is_routing_id) |
| 244 | _msgs_written++; |
| 245 | |
| 246 | return true; |
| 247 | } |
| 248 | |
| 249 | void zmq::pipe_t::rollback () const |
| 250 | { |
| 251 | // Remove incomplete message from the outbound pipe. |
| 252 | msg_t msg; |
| 253 | if (_out_pipe) { |
| 254 | while (_out_pipe->unwrite (&msg)) { |
| 255 | zmq_assert (msg.flags () & msg_t::more); |
| 256 | const int rc = msg.close (); |
| 257 | errno_assert (rc == 0); |
| 258 | } |
| 259 | } |
| 260 | } |
| 261 | |
| 262 | void zmq::pipe_t::flush () |
| 263 | { |
| 264 | // The peer does not exist anymore at this point. |
| 265 | if (_state == term_ack_sent) |
| 266 | return; |
| 267 | |
| 268 | if (_out_pipe && !_out_pipe->flush ()) |
| 269 | send_activate_read (_peer); |
| 270 | } |
| 271 | |
| 272 | void zmq::pipe_t::process_activate_read () |
| 273 | { |
| 274 | if (!_in_active && (_state == active || _state == waiting_for_delimiter)) { |
| 275 | _in_active = true; |
| 276 | _sink->read_activated (this); |
| 277 | } |
| 278 | } |
| 279 | |
| 280 | void zmq::pipe_t::process_activate_write (uint64_t msgs_read_) |
| 281 | { |
| 282 | // Remember the peer's message sequence number. |
| 283 | _peers_msgs_read = msgs_read_; |
| 284 | |
| 285 | if (!_out_active && _state == active) { |
| 286 | _out_active = true; |
| 287 | _sink->write_activated (this); |
| 288 | } |
| 289 | } |
| 290 | |
| 291 | void zmq::pipe_t::process_hiccup (void *pipe_) |
| 292 | { |
| 293 | // Destroy old outpipe. Note that the read end of the pipe was already |
| 294 | // migrated to this thread. |
| 295 | zmq_assert (_out_pipe); |
| 296 | _out_pipe->flush (); |
| 297 | msg_t msg; |
| 298 | while (_out_pipe->read (&msg)) { |
| 299 | if (!(msg.flags () & msg_t::more)) |
| 300 | _msgs_written--; |
| 301 | const int rc = msg.close (); |
| 302 | errno_assert (rc == 0); |
| 303 | } |
| 304 | LIBZMQ_DELETE (_out_pipe); |
| 305 | |
| 306 | // Plug in the new outpipe. |
| 307 | zmq_assert (pipe_); |
| 308 | _out_pipe = static_cast<upipe_t *> (pipe_); |
| 309 | _out_active = true; |
| 310 | |
| 311 | // If appropriate, notify the user about the hiccup. |
| 312 | if (_state == active) |
| 313 | _sink->hiccuped (this); |
| 314 | } |
| 315 | |
| 316 | void zmq::pipe_t::process_pipe_term () |
| 317 | { |
| 318 | zmq_assert (_state == active || _state == delimiter_received |
| 319 | || _state == term_req_sent1); |
| 320 | |
| 321 | // This is the simple case of peer-induced termination. If there are no |
| 322 | // more pending messages to read, or if the pipe was configured to drop |
| 323 | // pending messages, we can move directly to the term_ack_sent state. |
| 324 | // Otherwise we'll hang up in waiting_for_delimiter state till all |
| 325 | // pending messages are read. |
| 326 | if (_state == active) { |
| 327 | if (_delay) |
| 328 | _state = waiting_for_delimiter; |
| 329 | else { |
| 330 | _state = term_ack_sent; |
| 331 | _out_pipe = NULL; |
| 332 | send_pipe_term_ack (_peer); |
| 333 | } |
| 334 | } |
| 335 | |
| 336 | // Delimiter happened to arrive before the term command. Now we have the |
| 337 | // term command as well, so we can move straight to term_ack_sent state. |
| 338 | else if (_state == delimiter_received) { |
| 339 | _state = term_ack_sent; |
| 340 | _out_pipe = NULL; |
| 341 | send_pipe_term_ack (_peer); |
| 342 | } |
| 343 | |
| 344 | // This is the case where both ends of the pipe are closed in parallel. |
| 345 | // We simply reply to the request by ack and continue waiting for our |
| 346 | // own ack. |
| 347 | else if (_state == term_req_sent1) { |
| 348 | _state = term_req_sent2; |
| 349 | _out_pipe = NULL; |
| 350 | send_pipe_term_ack (_peer); |
| 351 | } |
| 352 | } |
| 353 | |
| 354 | void zmq::pipe_t::process_pipe_term_ack () |
| 355 | { |
| 356 | // Notify the user that all the references to the pipe should be dropped. |
| 357 | zmq_assert (_sink); |
| 358 | _sink->pipe_terminated (this); |
| 359 | |
| 360 | // In term_ack_sent and term_req_sent2 states there's nothing to do. |
| 361 | // Simply deallocate the pipe. In term_req_sent1 state we have to ack |
| 362 | // the peer before deallocating this side of the pipe. |
| 363 | // All the other states are invalid. |
| 364 | if (_state == term_req_sent1) { |
| 365 | _out_pipe = NULL; |
| 366 | send_pipe_term_ack (_peer); |
| 367 | } else |
| 368 | zmq_assert (_state == term_ack_sent || _state == term_req_sent2); |
| 369 | |
| 370 | // We'll deallocate the inbound pipe, the peer will deallocate the outbound |
| 371 | // pipe (which is an inbound pipe from its point of view). |
| 372 | // First, delete all the unread messages in the pipe. We have to do it by |
| 373 | // hand because msg_t doesn't have automatic destructor. Then deallocate |
| 374 | // the ypipe itself. |
| 375 | |
| 376 | if (!_conflate) { |
| 377 | msg_t msg; |
| 378 | while (_in_pipe->read (&msg)) { |
| 379 | const int rc = msg.close (); |
| 380 | errno_assert (rc == 0); |
| 381 | } |
| 382 | } |
| 383 | |
| 384 | LIBZMQ_DELETE (_in_pipe); |
| 385 | |
| 386 | // Deallocate the pipe object |
| 387 | delete this; |
| 388 | } |
| 389 | |
| 390 | void zmq::pipe_t::process_pipe_hwm (int inhwm_, int outhwm_) |
| 391 | { |
| 392 | set_hwms (inhwm_, outhwm_); |
| 393 | } |
| 394 | |
| 395 | void zmq::pipe_t::set_nodelay () |
| 396 | { |
| 397 | this->_delay = false; |
| 398 | } |
| 399 | |
| 400 | void zmq::pipe_t::terminate (bool delay_) |
| 401 | { |
| 402 | // Overload the value specified at pipe creation. |
| 403 | _delay = delay_; |
| 404 | |
| 405 | // If terminate was already called, we can ignore the duplicate invocation. |
| 406 | if (_state == term_req_sent1 || _state == term_req_sent2) { |
| 407 | return; |
| 408 | } |
| 409 | // If the pipe is in the final phase of async termination, it's going to |
| 410 | // closed anyway. No need to do anything special here. |
| 411 | if (_state == term_ack_sent) { |
| 412 | return; |
| 413 | } |
| 414 | // The simple sync termination case. Ask the peer to terminate and wait |
| 415 | // for the ack. |
| 416 | if (_state == active) { |
| 417 | send_pipe_term (_peer); |
| 418 | _state = term_req_sent1; |
| 419 | } |
| 420 | // There are still pending messages available, but the user calls |
| 421 | // 'terminate'. We can act as if all the pending messages were read. |
| 422 | else if (_state == waiting_for_delimiter && !_delay) { |
| 423 | // Drop any unfinished outbound messages. |
| 424 | rollback (); |
| 425 | _out_pipe = NULL; |
| 426 | send_pipe_term_ack (_peer); |
| 427 | _state = term_ack_sent; |
| 428 | } |
| 429 | // If there are pending messages still available, do nothing. |
| 430 | else if (_state == waiting_for_delimiter) { |
| 431 | } |
| 432 | // We've already got delimiter, but not term command yet. We can ignore |
| 433 | // the delimiter and ack synchronously terminate as if we were in |
| 434 | // active state. |
| 435 | else if (_state == delimiter_received) { |
| 436 | send_pipe_term (_peer); |
| 437 | _state = term_req_sent1; |
| 438 | } |
| 439 | // There are no other states. |
| 440 | else { |
| 441 | zmq_assert (false); |
| 442 | } |
| 443 | |
| 444 | // Stop outbound flow of messages. |
| 445 | _out_active = false; |
| 446 | |
| 447 | if (_out_pipe) { |
| 448 | // Drop any unfinished outbound messages. |
| 449 | rollback (); |
| 450 | |
| 451 | // Write the delimiter into the pipe. Note that watermarks are not |
| 452 | // checked; thus the delimiter can be written even when the pipe is full. |
| 453 | msg_t msg; |
| 454 | msg.init_delimiter (); |
| 455 | _out_pipe->write (msg, false); |
| 456 | flush (); |
| 457 | } |
| 458 | } |
| 459 | |
| 460 | bool zmq::pipe_t::is_delimiter (const msg_t &msg_) |
| 461 | { |
| 462 | return msg_.is_delimiter (); |
| 463 | } |
| 464 | |
| 465 | int zmq::pipe_t::compute_lwm (int hwm_) |
| 466 | { |
| 467 | // Compute the low water mark. Following point should be taken |
| 468 | // into consideration: |
| 469 | // |
| 470 | // 1. LWM has to be less than HWM. |
| 471 | // 2. LWM cannot be set to very low value (such as zero) as after filling |
| 472 | // the queue it would start to refill only after all the messages are |
| 473 | // read from it and thus unnecessarily hold the progress back. |
| 474 | // 3. LWM cannot be set to very high value (such as HWM-1) as it would |
| 475 | // result in lock-step filling of the queue - if a single message is |
| 476 | // read from a full queue, writer thread is resumed to write exactly one |
| 477 | // message to the queue and go back to sleep immediately. This would |
| 478 | // result in low performance. |
| 479 | // |
| 480 | // Given the 3. it would be good to keep HWM and LWM as far apart as |
| 481 | // possible to reduce the thread switching overhead to almost zero. |
| 482 | // Let's make LWM 1/2 of HWM. |
| 483 | const int result = (hwm_ + 1) / 2; |
| 484 | |
| 485 | return result; |
| 486 | } |
| 487 | |
| 488 | void zmq::pipe_t::process_delimiter () |
| 489 | { |
| 490 | zmq_assert (_state == active || _state == waiting_for_delimiter); |
| 491 | |
| 492 | if (_state == active) |
| 493 | _state = delimiter_received; |
| 494 | else { |
| 495 | _out_pipe = NULL; |
| 496 | send_pipe_term_ack (_peer); |
| 497 | _state = term_ack_sent; |
| 498 | } |
| 499 | } |
| 500 | |
| 501 | void zmq::pipe_t::hiccup () |
| 502 | { |
| 503 | // If termination is already under way do nothing. |
| 504 | if (_state != active) |
| 505 | return; |
| 506 | |
| 507 | // We'll drop the pointer to the inpipe. From now on, the peer is |
| 508 | // responsible for deallocating it. |
| 509 | |
| 510 | // Create new inpipe. |
| 511 | _in_pipe = |
| 512 | _conflate |
| 513 | ? static_cast<upipe_t *> (new (std::nothrow) ypipe_conflate_t<msg_t> ()) |
| 514 | : new (std::nothrow) ypipe_t<msg_t, message_pipe_granularity> (); |
| 515 | |
| 516 | alloc_assert (_in_pipe); |
| 517 | _in_active = true; |
| 518 | |
| 519 | // Notify the peer about the hiccup. |
| 520 | send_hiccup (_peer, _in_pipe); |
| 521 | } |
| 522 | |
| 523 | void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_) |
| 524 | { |
| 525 | int in = inhwm_ + std::max (_in_hwm_boost, 0); |
| 526 | int out = outhwm_ + std::max (_out_hwm_boost, 0); |
| 527 | |
| 528 | // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite |
| 529 | if (inhwm_ <= 0 || _in_hwm_boost == 0) |
| 530 | in = 0; |
| 531 | |
| 532 | if (outhwm_ <= 0 || _out_hwm_boost == 0) |
| 533 | out = 0; |
| 534 | |
| 535 | _lwm = compute_lwm (in); |
| 536 | _hwm = out; |
| 537 | } |
| 538 | |
| 539 | void zmq::pipe_t::set_hwms_boost (int inhwmboost_, int outhwmboost_) |
| 540 | { |
| 541 | _in_hwm_boost = inhwmboost_; |
| 542 | _out_hwm_boost = outhwmboost_; |
| 543 | } |
| 544 | |
| 545 | bool zmq::pipe_t::check_hwm () const |
| 546 | { |
| 547 | const bool full = |
| 548 | _hwm > 0 && _msgs_written - _peers_msgs_read >= uint64_t (_hwm); |
| 549 | return !full; |
| 550 | } |
| 551 | |
| 552 | void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_) |
| 553 | { |
| 554 | send_pipe_hwm (_peer, inhwm_, outhwm_); |
| 555 | } |
| 556 | |
| 557 | void zmq::pipe_t::set_endpoint_pair (zmq::endpoint_uri_pair_t endpoint_pair_) |
| 558 | { |
| 559 | _endpoint_pair = ZMQ_MOVE (endpoint_pair_); |
| 560 | } |
| 561 | |
| 562 | const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const |
| 563 | { |
| 564 | return _endpoint_pair; |
| 565 | } |
| 566 | |
| 567 | void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_) |
| 568 | { |
| 569 | endpoint_uri_pair_t *ep = |
| 570 | new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair); |
| 571 | send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_, |
| 572 | ep); |
| 573 | } |
| 574 | |
| 575 | void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_, |
| 576 | own_t *socket_base_, |
| 577 | endpoint_uri_pair_t *endpoint_pair_) |
| 578 | { |
| 579 | send_pipe_stats_publish (socket_base_, queue_count_, |
| 580 | _msgs_written - _peers_msgs_read, endpoint_pair_); |
| 581 | } |
| 582 | |