这次主要来分析upstream中的发送数据给client, 以及当buf不足,将一部分写到temp file的部分,他们对应的函数分别是ngx_event_pipe_write_to_downstream和ngx_event_pipe_write_chain_to_temp_file.


首先来看这个函数的第一部分的代码,这部分代码主要是遍历p->in,然后计算能写多少buf到文件(temp file的size是有限制的).

    if (p->buf_to_file) {
        fl.buf = p->buf_to_file;
        fl.next = p->in;
        out = &fl;

    } else {
        out = p->in;
    if (!p->cacheable) {

        size = 0;
        cl = out;
        ll = NULL;

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                       "pipe offset: %O", p->temp_file->offset);
        do {
            bsize = cl->buf->last - cl->buf->pos;
            if ((size + bsize > p->temp_file_write_size)
               || (p->temp_file->offset + size + bsize > p->max_temp_file_size))

            size += bsize;
            ll = &cl->next;
            cl = cl->next;

        } while (cl);

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);

        if (ll == NULL) {
            return NGX_BUSY;
//cl存在则说明只有一部分buf能够写入到temp file,此时p->in保存剩下的chain
        if (cl) {
           p->in = cl;
           *ll = NULL;

        } else {
//否则说明所有的buf都写入到了temp file,此时p->in则设置为空
           p->in = NULL;
           p->last_in = &p->in;

    } else {
        p->in = NULL;
        p->last_in = &p->in;

然后是第二部分,也就是最后一部分,这部分就是写buf到temp file,然后将已经写到temp file的buf,挂载到free_raw_bufs,以供继续使用。这里有用到shadow buf,因为shadow buf的内存是已经分配好的,而当已经写入到temp file之后,这部分buf自然就可以重新使用了。

还有一个很重要的操作,那就是将已经写入到temp file的buf 挂载到p->out上.

    if (ngx_write_chain_to_temp_file(p->temp_file, out) == NGX_ERROR) {
        return NGX_ABORT;
    for (last_free = &p->free_raw_bufs;
         *last_free != NULL;
         last_free = &(*last_free)->next)
        /* void */

    if (p->buf_to_file) {
        p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
        p->buf_to_file = NULL;
        out = out->next;
    for (cl = out; cl; cl = next) {
        next = cl->next;
        cl->next = NULL;

        b = cl->buf;
        b->file = &p->temp_file->file;
        b->file_pos = p->temp_file->offset;
        p->temp_file->offset += b->last - b->pos;
        b->file_last = p->temp_file->offset;

        b->in_file = 1;
        b->temp_file = 1;
        if (p->out) {
            *p->last_out = cl;
        } else {
            p->out = cl;
        p->last_out = &cl->next;
        if (b->last_shadow) {

            tl = ngx_alloc_chain_link(p->pool);
            if (tl == NULL) {
                return NGX_ABORT;
            tl->buf = b->shadow;
            tl->next = NULL;
            *last_free = tl;
            last_free = &tl->next;
//reset buf
            b->shadow->pos = b->shadow->start;
            b->shadow->last = b->shadow->start;

然后来看ngx_event_pipe_write_to_downstream,也就是发送数据到client的部分,这个函数整体是一个大循环, 而在这个大循环内分为三部分,其中第一部分处理upstream已经发送完毕(比如断开连接,出错等)时的情况,第二部分是对发送前的buf进行一些处理(比如busy buf,p->in,p->out等),然后循环发送,第三部分就是调用发送接口(output_filter)发送数据,然后update chain.



然后时buf的recycle属性,这个属性主要目的是这样子的,由于在p->input_filter中,nginx制造了一个buf充当已经读取的buf的shadow(last_shadow = 1),而在以后,nginx操作的都是这个buf,这个buf就被设置为recycled,也就是可以循环利用。而且当设置了recycled,则也将会在发送数据的时候,立即将buf发送出去,而不会缓存(可以看ngx_http_write_filter中的判断).

        if (p->upstream_eof || p->upstream_error || p->upstream_done) {

            /* pass the p->out and p->in chains to the output filter */
            for (cl = p->busy; cl; cl = cl->next) {
                cl->buf->recycled = 0;
            if (p->out) {
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe write downstream flush out");
                for (cl = p->out; cl; cl = cl->next) {
                    cl->buf->recycled = 0;
                rc = p->output_filter(p->output_ctx, p->out);
                if (rc == NGX_ERROR) {
                    p->downstream_error = 1;
                    return ngx_event_pipe_drain_chains(p);

                p->out = NULL;
            if (p->in) {
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe write downstream flush in");
                for (cl = p->in; cl; cl = cl->next) {
                    cl->buf->recycled = 0;
                rc = p->output_filter(p->output_ctx, p->in);
                if (rc == NGX_ERROR) {
                    p->downstream_error = 1;
                    return ngx_event_pipe_drain_chains(p);

                p->in = NULL;
            if (p->cacheable && p->buf_to_file) {

                file.buf = p->buf_to_file;
                file.next = NULL;

                if (ngx_write_chain_to_temp_file(p->temp_file, &file)
                    == NGX_ERROR)
                    return NGX_ABORT;

            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                           "pipe write downstream done");

            /* TODO: free unused bufs */

            p->downstream_done = 1;



它的步骤是这样子的,首先会计算busy chain的大小,因为我们有busy chain的限制(有busy buf的命令).然后计算p->in/p->out,最后得到一个最大的chain,然后发送。这里要注意,我们发送的每个buf大小是不会大于busy buf的大小的。

        if (downstream->data != p->output_ctx
            || !downstream->write->ready
            || downstream->write->delayed)

        prev = NULL;
        bsize = 0;
        for (cl = p->busy; cl; cl = cl->next) {

            if (cl->buf->recycled) {
                if (prev == cl->buf->start) {
                bsize += cl->buf->end - cl->buf->start;
                prev = cl->buf->start;

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                       "pipe write busy: %uz", bsize);

        out = NULL;
//如果bsize大于我们设置的busy buf size,则直接发送数据
        if (bsize >= (size_t) p->busy_size) {
            flush = 1;
            goto flush;

        flush = 0;
        ll = NULL;
        prev_last_shadow = 1;
        for ( ;; ) {
            if (p->out) {
                cl = p->out;
//计算将要发送的buf是否大于我们设置的busy_size,而cl->buf->last - cl->buf->pos是肯定小于等于busy_size.
                if (cl->buf->recycled
                    && bsize + cl->buf->last - cl->buf->pos > p->busy_size)
                    flush = 1;

                p->out = p->out->next;
//将shadow buf 放到free_raw_bufs中,以便后面使用
                ngx_event_pipe_free_shadow_raw_buf(&p->free_raw_bufs, cl->buf);

            } else if (!p->cacheable && p->in) {
                cl = p->in;

                ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe write buf ls:%d %p %z",
                               cl->buf->last - cl->buf->pos);
                if (cl->buf->recycled
                    && cl->buf->last_shadow
                    && bsize + cl->buf->last - cl->buf->pos > p->busy_size)
                    if (!prev_last_shadow) {
//设置out chain
                        p->in = p->in->next;

                        cl->next = NULL;

                        if (out) {
                            *ll = cl;
                        } else {
                            out = cl;

                    flush = 1;

                prev_last_shadow = cl->buf->last_shadow;

                p->in = p->in->next;

            } else {
            if (cl->buf->recycled) {
                bsize += cl->buf->last - cl->buf->pos;

            cl->next = NULL;
            if (out) {
                *ll = cl;
            } else {
                out = cl;
            ll = &cl->next;

然后是最后一部分,也就是发送chain,然后update chain的操作(类似chain output的操作)


        rc = p->output_filter(p->output_ctx, out);
        if (rc == NGX_ERROR) {
            p->downstream_error = 1;
            return ngx_event_pipe_drain_chains(p);
//update chain,将已经完全发送的chain保存到free,还没发送的保存到busy.
        ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag);
//遍历free chain,
        for (cl = p->free; cl; cl = cl->next) {

            if (cl->buf->temp_file) {
                if (p->cacheable || !p->cyclic_temp_file) {

                /* reset p->temp_offset if all bufs had been sent */

                if (cl->buf->file_last == p->temp_file->offset) {
                    p->temp_file->offset = 0;

            /* TODO: free buf if p->free_bufs && upstream done */

            /* add the free shadow raw buf to p->free_raw_bufs */
            if (cl->buf->last_shadow) {
                if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
                    return NGX_ABORT;

                cl->buf->last_shadow = 0;

            cl->buf->shadow = NULL;

上面的分析,还遗漏了一个函数,那就是ngx_event_pipe_drain_chains,这个函数被调用,说明client出错,此时则需要将对应的shadow buf放到free_raw_buf中(调用(ngx_event_pipe_add_free_buf).


并且,这里只有cache没有打开的时候,才会finalize 当前的request,这是因为当cache打开的时候,当前的request 是需要被cache,然后下次请求再次到达,就可以发送cache的了。所以此时需要接受完后端所有的数据。


static void
ngx_http_upstream_process_request(ngx_http_request_t *r)
    ngx_uint_t            del;
    ngx_temp_file_t      *tf;
    ngx_event_pipe_t     *p;
    ngx_http_upstream_t  *u;

    u = r->upstream;
    p = u->pipe;
//当finalize upstream之后,connection将会被赋值为NULL
    if (u->peer.connection) {

        if (u->store) {

            del = p->upstream_error;

            tf = u->pipe->temp_file;

            if (p->upstream_eof || p->upstream_done) {

                if (u->headers_in.status_n == NGX_HTTP_OK
                    && (u->headers_in.content_length_n == -1
                        || (u->headers_in.content_length_n == tf->offset)))
                    ngx_http_upstream_store(r, u);

                } else {
                    del = 1;

            if (del && tf->file.fd != NGX_INVALID_FILE) {

                if (ngx_delete_file(tf->file.name.data) == NGX_FILE_ERROR) {

                    ngx_log_error(NGX_LOG_CRIT, r->connection->log, ngx_errno,
                                  ngx_delete_file_n " "%s" failed",
//如果upstream端发送完毕(断开等),则finalize request。
        if (p->upstream_done || p->upstream_eof || p->upstream_error) {
            ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                           "http upstream exit: %p", p->out);
            ngx_http_upstream_finalize_request(r, u, 0);
//如果downstream error设置,则进入下面的处理
    if (p->downstream_error) {
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http upstream downstream error");
//这里可以看到如果cache没有打开,则finalize 当前的request
        if (!u->cacheable && !u->store && u->peer.connection) {
            ngx_http_upstream_finalize_request(r, u, 0);

