Node.js中网络与流的示例分析

发布时间:2021-07-24 14:06:02 作者:小新
来源:亿速云 阅读:88

这篇文章给大家分享的是有关Node.js中网络与流的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

涉及的知识点

例子 tcp-echo-server/main.c

libuv 异步使用 BSD 套接字 的例子

libuv 中的网络和直接使用 BSD 套接字接口没有什么不同,有些事情更简单,都是无阻塞的,但概念都是一样的。此外,libuv 还提供了一些实用的函数来抽象出那些烦人的、重复的、低级的任务,比如使用BSD套接字结构设置套接字、DNS查询以及调整各种套接字参数。

int main() {
    loop = uv_default_loop();

    uv_tcp_t server;
    uv_tcp_init(loop, &server);

    uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);

    uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
    int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
    if (r) {
        fprintf(stderr, "Listen error %s\n", uv_strerror(r));
        return 1;
    }
    return uv_run(loop, UV_RUN_DEFAULT);
}

void on_new_connection(uv_stream_t *server, int status) {
    if (status < 0) {
        fprintf(stderr, "New connection error %s\n", uv_strerror(status));
        // error!
        return;
    }

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
        uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}

同步的例子

这是一个正常同步使用 BSD 套接字 的例子。

作为参照可以发现主要有如下几步

int main(void)
  {
    struct sockaddr_in stSockAddr;
    int SocketFD = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
  
    if(-1 == SocketFD)
    {
      perror("can not create socket");
      exit(EXIT_FAILURE);
    }
  
    memset(&stSockAddr, 0, sizeof(struct sockaddr_in));
  
    stSockAddr.sin_family = AF_INET;
    stSockAddr.sin_port = htons(1100);
    stSockAddr.sin_addr.s_addr = INADDR_ANY;
  
    if(-1 == bind(SocketFD,(const struct sockaddr *)&stSockAddr, sizeof(struct sockaddr_in)))
    {
      perror("error bind failed");
      close(SocketFD);
      exit(EXIT_FAILURE);
    }
  
    if(-1 == listen(SocketFD, 10))
    {
      perror("error listen failed");
      close(SocketFD);
      exit(EXIT_FAILURE);
    }
  
    for(;;)
    {
      int ConnectFD = accept(SocketFD, NULL, NULL);
  
      if(0 > ConnectFD)
      {
        perror("error accept failed");
        close(SocketFD);
        exit(EXIT_FAILURE);
      }
  
     /* perform read write operations ... */
  
      shutdown(ConnectFD, SHUT_RDWR);
  
      close(ConnectFD);
    }

    close(SocketFD);
    return 0;
  }

uv_tcp_init

main > uv_tcp_init

1、对 domain 进行了验证, 需要是下面3种的一种

2、tcp 也是一种流, 调用 uv__stream_init 对流数据进行初始化

int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
  return uv_tcp_init_ex(loop, tcp, AF_UNSPEC);
}

int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) {
  int domain;

  /* Use the lower 8 bits for the domain */
  domain = flags & 0xFF;
  if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
    return UV_EINVAL;

  if (flags & ~0xFF)
    return UV_EINVAL;

  uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP);

  ...

  return 0;
}

uv__stream_init

main > uv_tcp_init > uv__stream_init

流的初始化函数使用的地方还是特别多的, 也特别重要。下述 i/o 的完整实现参考 【libuv 源码学习笔记】线程池与i/o

1、对流会被调用的回调函数等进行一个初始化

// queued_fds

1. 当收到其他进程通过 ipc 写入的数据时, 调用 uv__stream_recv_cmsg 函数
2. uv__stream_recv_cmsg 函数读取到进程传递过来的 fd 引用, 调用 uv__stream_queue_fd 函数保存。
3. queued_fds 被消费主要在 src/stream_wrap.cc LibuvStreamWrap::OnUvRead > AcceptHandle 函数中。

2、其中专门为 loop->emfile_fd 通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符, 追踪发现原来是解决 accept (EMFILE错误), 下面我们讲 uv__accept 的时候再细说这个 loop->emfile_fd 的妙用。

accept处理连接时,若出现 EMFILE 错误不进行处理,则内核间隔性尝试连接,导致整个网络设计程序崩溃

3、调用 uv__io_init 初始化的该 stream 的 i/o 观察者的回调函数为 uv__stream_io

void uv__stream_init(uv_loop_t* loop,
                     uv_stream_t* stream,
                     uv_handle_type type) {
  int err;

  uv__handle_init(loop, (uv_handle_t*)stream, type);
  stream->read_cb = NULL;
  stream->alloc_cb = NULL;
  stream->close_cb = NULL;
  stream->connection_cb = NULL;
  stream->connect_req = NULL;
  stream->shutdown_req = NULL;
  stream->accepted_fd = -1;
  stream->queued_fds = NULL;
  stream->delayed_error = 0;
  QUEUE_INIT(&stream->write_queue);
  QUEUE_INIT(&stream->write_completed_queue);
  stream->write_queue_size = 0;

  if (loop->emfile_fd == -1) {
    err = uv__open_cloexec("/dev/null", O_RDONLY);
    if (err < 0)
        /* In the rare case that "/dev/null" isn't mounted open "/"
         * instead.
         */
        err = uv__open_cloexec("/", O_RDONLY);
    if (err >= 0)
      loop->emfile_fd = err;
  }

#if defined(__APPLE__)
  stream->select = NULL;
#endif /* defined(__APPLE_) */

  uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}

uv__open_cloexec

main > uv_tcp_init > uv__stream_init > uv__open_cloexec

同步调用 open 方法拿到了 fd, 也许你会问为啥不像 【libuv 源码学习笔记】线程池与i/o 中调用 uv_fs_open 异步获取 fd, 其实 libuv 中并不全部是异步的实现, 比如当前的例子启动 tcp 服务前的一些初始化, 而不是用户请求过程中发生的任务, 同步也是能接受的。

int uv__open_cloexec(const char* path, int flags) {
#if defined(O_CLOEXEC)
  int fd;

  fd = open(path, flags | O_CLOEXEC);
  if (fd == -1)
    return UV__ERR(errno);

  return fd;
#else  /* O_CLOEXEC */
  int err;
  int fd;

  fd = open(path, flags);
  if (fd == -1)
    return UV__ERR(errno);

  err = uv__cloexec(fd, 1);
  if (err) {
    uv__close(fd);
    return err;
  }

  return fd;
#endif  /* O_CLOEXEC */
}

uv__stream_io

main > uv_tcp_init > uv__stream_init > uv__stream_io

双工流的 i/o 观察者回调函数, 如调用的 stream->connect_req 函数, 其值是例子中 uv_listen 函数的最后一个参数 on_new_connection。

static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  uv_stream_t* stream;

  stream = container_of(w, uv_stream_t, io_watcher);

  assert(stream->type == UV_TCP ||
         stream->type == UV_NAMED_PIPE ||
         stream->type == UV_TTY);
  assert(!(stream->flags & UV_HANDLE_CLOSING));

  if (stream->connect_req) {
    uv__stream_connect(stream);
    return;
  }

  assert(uv__stream_fd(stream) >= 0);

  if (events & (POLLIN | POLLERR | POLLHUP))
    uv__read(stream);

  if (uv__stream_fd(stream) == -1)
    return;  /* read_cb closed stream. */

  if ((events & POLLHUP) &&
      (stream->flags & UV_HANDLE_READING) &&
      (stream->flags & UV_HANDLE_READ_PARTIAL) &&
      !(stream->flags & UV_HANDLE_READ_EOF)) {
    uv_buf_t buf = { NULL, 0 };
    uv__stream_eof(stream, &buf);
  }

  if (uv__stream_fd(stream) == -1)
    return;  /* read_cb closed stream. */

  if (events & (POLLOUT | POLLERR | POLLHUP)) {
    uv__write(stream);
    uv__write_callbacks(stream);

    /* Write queue drained. */
    if (QUEUE_EMPTY(&stream->write_queue))
      uv__drain(stream);
  }
}

uv_ip4_addr

main > uv_ip4_addr

uv_ip4_addr 用于将人类可读的 IP 地址、端口对转换为 BSD 套接字 API 所需的 sockaddr_in 结构。

int uv_ip4_addr(const char* ip, int port, struct sockaddr_in* addr) {
  memset(addr, 0, sizeof(*addr));
  addr->sin_family = AF_INET;
  addr->sin_port = htons(port);
#ifdef SIN6_LEN
  addr->sin_len = sizeof(*addr);
#endif
  return uv_inet_pton(AF_INET, ip, &(addr->sin_addr.s_addr));
}

uv_tcp_bind

main > uv_tcp_bind

从 uv_ip4_addr 函数的实现, 其实是在 addr 的 sin_family 上面设置值为 AF_INET, 但在 uv_tcp_bind 函数里面却是从 addr 的 sa_family属性上面取的值, 这让 c 初学者的我又陷入了一阵思考 ...

sockaddr_in 和 sockaddr 是并列的结构,指向 sockaddr_in 的结构体的指针也可以指向 sockaddr 的结构体,并代替它。也就是说,你可以使用 sockaddr_in 建立你所需要的信息,然后用 memset 函数初始化就可以了memset((char*)&mysock,0,sizeof(mysock));//初始化

原来是这样, 这里通过强制指针类型转换 const struct sockaddr* addr 达到的目的, 函数的最后调用了 uv__tcp_bind 函数。

int uv_tcp_bind(uv_tcp_t* handle,
                const struct sockaddr* addr,
                unsigned int flags) {
  unsigned int addrlen;

  if (handle->type != UV_TCP)
    return UV_EINVAL;

  if (addr->sa_family == AF_INET)
    addrlen = sizeof(struct sockaddr_in);
  else if (addr->sa_family == AF_INET6)
    addrlen = sizeof(struct sockaddr_in6);
  else
    return UV_EINVAL;

  return uv__tcp_bind(handle, addr, addrlen, flags);
}

uv__tcp_bind

main > uv_tcp_bind > uv__tcp_bind

int uv__tcp_bind(uv_tcp_t* tcp,
                 const struct sockaddr* addr,
                 unsigned int addrlen,
                 unsigned int flags) {
  int err;
  int on;

  /* Cannot set IPv6-only mode on non-IPv6 socket. */
  if ((flags & UV_TCP_IPV6ONLY) && addr->sa_family != AF_INET6)
    return UV_EINVAL;

  err = maybe_new_socket(tcp, addr->sa_family, 0);
  if (err)
    return err;

  on = 1;
  if (setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
    return UV__ERR(errno);

...

  errno = 0;
  if (bind(tcp->io_watcher.fd, addr, addrlen) && errno != EADDRINUSE) {
    if (errno == EAFNOSUPPORT)
      return UV_EINVAL;
    return UV__ERR(errno);
  }
...
}

new_socket

main > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket

static int new_socket(uv_tcp_t* handle, int domain, unsigned long flags) {
  struct sockaddr_storage saddr;
  socklen_t slen;
  int sockfd;
  int err;

  err = uv__socket(domain, SOCK_STREAM, 0);
  if (err < 0)
    return err;
  sockfd = err;

  err = uv__stream_open((uv_stream_t*) handle, sockfd, flags);
  
  ...

  return 0;
}

uv__stream_open

main > uv_tcp_bind > uv__tcp_bind > maybe_new_socket > new_socket > uv__stream_open

主要用于设置 stream->io_watcher.fd 为参数传入的 fd。

int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
#if defined(__APPLE__)
  int enable;
#endif

  if (!(stream->io_watcher.fd == -1 || stream->io_watcher.fd == fd))
    return UV_EBUSY;

  assert(fd >= 0);
  stream->flags |= flags;

  if (stream->type == UV_TCP) {
    if ((stream->flags & UV_HANDLE_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
      return UV__ERR(errno);

    /* TODO Use delay the user passed in. */
    if ((stream->flags & UV_HANDLE_TCP_KEEPALIVE) &&
        uv__tcp_keepalive(fd, 1, 60)) {
      return UV__ERR(errno);
    }
  }

#if defined(__APPLE__)
  enable = 1;
  if (setsockopt(fd, SOL_SOCKET, SO_OOBINLINE, &enable, sizeof(enable)) &&
      errno != ENOTSOCK &&
      errno != EINVAL) {
    return UV__ERR(errno);
  }
#endif

  stream->io_watcher.fd = fd;

  return 0;
}

uv_listen

main > uv_listen

主要调用了 uv_tcp_listen 函数。

int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
  int err;

  err = ERROR_INVALID_PARAMETER;
  switch (stream->type) {
    case UV_TCP:
      err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
      break;
    case UV_NAMED_PIPE:
      err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
      break;
    default:
      assert(0);
  }

  return uv_translate_sys_error(err);
}

uv_tcp_listen

main > uv_listen > uv_tcp_listen

int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
  ...

  if (listen(tcp->io_watcher.fd, backlog))
    return UV__ERR(errno);

  tcp->connection_cb = cb;
  tcp->flags |= UV_HANDLE_BOUND;

  /* Start listening for connections. */
  tcp->io_watcher.cb = uv__server_io;
  uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);

  return 0;
}

uv__server_io

main > uv_listen > uv_tcp_listen > uv__server_io

tcp 流的 i/o 观察者回调函数

void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
  ...
  
  while (uv__stream_fd(stream) != -1) {
    assert(stream->accepted_fd == -1);

    err = uv__accept(uv__stream_fd(stream));
    if (err < 0) {
      if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
        return;  /* Not an error. */

      if (err == UV_ECONNABORTED)
        continue;  /* Ignore. Nothing we can do about that. */

      if (err == UV_EMFILE || err == UV_ENFILE) {
        err = uv__emfile_trick(loop, uv__stream_fd(stream));
        if (err == UV_EAGAIN || err == UV__ERR(EWOULDBLOCK))
          break;
      }

      stream->connection_cb(stream, err);
      continue;
    }

    UV_DEC_BACKLOG(w)
    stream->accepted_fd = err;
    stream->connection_cb(stream, 0);

    ...
}

uv__emfile_trick

main > uv_listen > uv_tcp_listen > uv__server_io > uv__emfile_trick

在上面的 uv__stream_init 函数中, 我们发现 loop 的 emfile_fd 属性上通过 uv__open_cloexec 方法创建一个指向空文件(/dev/null)的 idlefd 文件描述符。

当出现 accept (EMFILE错误)即文件描述符用尽时的错误时

首先将 loop->emfile_fd 文件描述符, 使其能 accept 新连接, 然后我们新连接将其关闭,以使其低于EMFILE的限制。接下来,我们接受所有等待的连接并关闭它们以向客户发出信号,告诉他们我们已经超载了--我们确实超载了,但是我们仍在继续工作。

static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
  int err;
  int emfile_fd;

  if (loop->emfile_fd == -1)
    return UV_EMFILE;

  uv__close(loop->emfile_fd);
  loop->emfile_fd = -1;

  do {
    err = uv__accept(accept_fd);
    if (err >= 0)
      uv__close(err);
  } while (err >= 0 || err == UV_EINTR);

  emfile_fd = uv__open_cloexec("/", O_RDONLY);
  if (emfile_fd >= 0)
    loop->emfile_fd = emfile_fd;

  return err;
}

on_new_connection

当收到一个新连接, 例子中的 on_new_connection 函数被调用

void on_new_connection(uv_stream_t *server, int status) {
    if (status < 0) {
        fprintf(stderr, "New connection error %s\n", uv_strerror(status));
        // error!
        return;
    }

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
        uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
}

uv_accept

on_new_connection > uv_accept

根据不同的协议调用不同的方法, 该例子 tcp 调用 uv__stream_open 方法

uv__stream_open 设置给初始化完成的 client 流设置了 i/o 观察者的 fd。该 fd 即是 uv__server_io 中提到的 ConnectFD 。

int uv_accept(uv_stream_t* server, uv_stream_t* client) {
  int err;

  assert(server->loop == client->loop);

  if (server->accepted_fd == -1)
    return UV_EAGAIN;

  switch (client->type) {
    case UV_NAMED_PIPE:
    case UV_TCP:
      err = uv__stream_open(client,
                            server->accepted_fd,
                            UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
      if (err) {
        /* TODO handle error */
        uv__close(server->accepted_fd);
        goto done;
      }
      break;

    case UV_UDP:
      err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
      if (err) {
        uv__close(server->accepted_fd);
        goto done;
      }
      break;

    default:
      return UV_EINVAL;
  }

  client->flags |= UV_HANDLE_BOUND;

done:
  /* Process queued fds */
  if (server->queued_fds != NULL) {
    uv__stream_queued_fds_t* queued_fds;

    queued_fds = server->queued_fds;

    /* Read first */
    server->accepted_fd = queued_fds->fds[0];

    /* All read, free */
    assert(queued_fds->offset > 0);
    if (--queued_fds->offset == 0) {
      uv__free(queued_fds);
      server->queued_fds = NULL;
    } else {
      /* Shift rest */
      memmove(queued_fds->fds,
              queued_fds->fds + 1,
              queued_fds->offset * sizeof(*queued_fds->fds));
    }
  } else {
    server->accepted_fd = -1;
    if (err == 0)
      uv__io_start(server->loop, &server->io_watcher, POLLIN);
  }
  return err;
}

uv_read_start

on_new_connection > uv_read_start

开启一个流的监听工作

uv_read_start 主要是调用了 uv__read_start 函数。开始了普通流的 i/o 过程。

int uv__read_start(uv_stream_t* stream,
                   uv_alloc_cb alloc_cb,
                   uv_read_cb read_cb) {
  assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
      stream->type == UV_TTY);

  /* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
   * expresses the desired state of the user.
   */
  stream->flags |= UV_HANDLE_READING;

  /* TODO: try to do the read inline? */
  /* TODO: keep track of tcp state. If we've gotten a EOF then we should
   * not start the IO watcher.
   */
  assert(uv__stream_fd(stream) >= 0);
  assert(alloc_cb);

  stream->read_cb = read_cb;
  stream->alloc_cb = alloc_cb;

  uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
  uv__handle_start(stream);
  uv__stream_osx_interrupt_select(stream);

  return 0;
}

感谢各位的阅读!关于“Node.js中网络与流的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

推荐阅读:
  1. Node.js中阻塞与非阻塞的示例分析
  2. node.js中stream流中可读流和可写流的实现与使用方法实例分析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

node.js

上一篇:php模板的原理是什么

下一篇:SpringBoot中如何配置Druid数据库连接池

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》