最新消息:

nginx中upstream的设计和实现(五)

server admin 3682浏览 0评论

原创文章,转载请注明: 转载自pagefault

本文链接地址: nginx中upstream的设计和实现(五)

这次主要来分析upstream中的发送数据给client, 以及当buf不足,将一部分写到temp file的部分,他们对应的函数分别是ngx_event_pipe_write_to_downstream和ngx_event_pipe_write_chain_to_temp_file.

先来看ngx_event_pipe_write_to_downstream,这个函数顾名思义,就是写buf到临时文件。而所写的buf就是p->in,也就是将要发送给client的数据。
这个函数,它会处理两类的情况,一类是cache打开,一类是cache未打开。我们这里主要来分析cache关闭的情况。

首先来看这个函数的第一部分的代码,这部分代码主要是遍历p->in,然后计算能写多少buf到文件(temp file的size是有限制的).

//out就是将要保存到file的数据
    if (p->buf_to_file) {
//cache打开的情况
        fl.buf = p->buf_to_file;
        fl.next = p->in;
        out = &fl;

    } else {
//得到数据
        out = p->in;
    }
//如果cache没有打开
    if (!p->cacheable) {

        size = 0;
        cl = out;
        ll = NULL;

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                       "pipe offset: %O", p->temp_file->offset);
//开始遍历out
        do {
//计算大小
            bsize = cl->buf->last - cl->buf->pos;
.....................................................
//看是否超过限制限制
            if ((size + bsize > p->temp_file_write_size)
               || (p->temp_file->offset + size + bsize > p->max_temp_file_size))
            {
                break;
            }

            size += bsize;
            ll = &cl->next;
            cl = cl->next;

        } while (cl);

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);

        if (ll == NULL) {
            return NGX_BUSY;
        }
//cl存在则说明只有一部分buf能够写入到temp file,此时p->in保存剩下的chain
        if (cl) {
           p->in = cl;
           *ll = NULL;

        } else {
//否则说明所有的buf都写入到了temp file,此时p->in则设置为空
           p->in = NULL;
           p->last_in = &p->in;
        }

    } else {
//cache打开的情况,可以看到和上面类似.
        p->in = NULL;
        p->last_in = &p->in;
    }

然后是第二部分,也就是最后一部分,这部分就是写buf到temp file,然后将已经写到temp file的buf,挂载到free_raw_bufs,以供继续使用。这里有用到shadow buf,因为shadow buf的内存是已经分配好的,而当已经写入到temp file之后,这部分buf自然就可以重新使用了。

还有一个很重要的操作,那就是将已经写入到temp file的buf 挂载到p->out上.

//写out到tempfile
    if (ngx_write_chain_to_temp_file(p->temp_file, out) == NGX_ERROR) {
        return NGX_ABORT;
    }
//遍历free_raw_bufs,以便与接下来将使用过的buf挂载到后面。
    for (last_free = &p->free_raw_bufs;
         *last_free != NULL;
         last_free = &(*last_free)->next)
    {
        /* void */
    }

    if (p->buf_to_file) {
        p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
        p->buf_to_file = NULL;
        out = out->next;
    }
//遍历out
    for (cl = out; cl; cl = next) {
        next = cl->next;
        cl->next = NULL;

        b = cl->buf;
//可以看到重新设置buf为file。然后设置相关属性
        b->file = &p->temp_file->file;
        b->file_pos = p->temp_file->offset;
        p->temp_file->offset += b->last - b->pos;
        b->file_last = p->temp_file->offset;

        b->in_file = 1;
        b->temp_file = 1;
//这里就是将buf放入到p->out.
        if (p->out) {
            *p->last_out = cl;
        } else {
            p->out = cl;
        }
//设置last_out
        p->last_out = &cl->next;
//shadow存在,则将已经保存到file的buf挂载到free_raw_buf中。
        if (b->last_shadow) {

            tl = ngx_alloc_chain_link(p->pool);
            if (tl == NULL) {
                return NGX_ABORT;
            }
//可以看到使用它的shadow
            tl->buf = b->shadow;
            tl->next = NULL;
//last_free就是free_raw_buf的尾部
            *last_free = tl;
            last_free = &tl->next;
//reset buf
            b->shadow->pos = b->shadow->start;
            b->shadow->last = b->shadow->start;
//remove掉shadow。
            ngx_event_pipe_remove_shadow_links(b->shadow);
        }
    }

然后来看ngx_event_pipe_write_to_downstream,也就是发送数据到client的部分,这个函数整体是一个大循环, 而在这个大循环内分为三部分,其中第一部分处理upstream已经发送完毕(比如断开连接,出错等)时的情况,第二部分是对发送前的buf进行一些处理(比如busy buf,p->in,p->out等),然后循环发送,第三部分就是调用发送接口(output_filter)发送数据,然后update chain.

ok,接下来我们就来看这三部分。先来看第一部分,这部分就是处理upstream端已经发送完毕的情况,此时需要我们立即发送数据到downstream。

这里需要注意的是在upstream发送的时候,对于buf的选择有一个顺序,通过前面一篇blog,我们能看到当拷贝从upstream的数据的时候,就有一个顺序。
这个顺序是这样子的,首先是p->out,然后是p->in,因为p->out保存了一些buf在文件中的buf。

然后时buf的recycle属性,这个属性主要目的是这样子的,由于在p->input_filter中,nginx制造了一个buf充当已经读取的buf的shadow(last_shadow = 1),而在以后,nginx操作的都是这个buf,这个buf就被设置为recycled,也就是可以循环利用。而且当设置了recycled,则也将会在发送数据的时候,立即将buf发送出去,而不会缓存(可以看ngx_http_write_filter中的判断).

//判断upstream状态
        if (p->upstream_eof || p->upstream_error || p->upstream_done) {

            /* pass the p->out and p->in chains to the output filter */
//取消掉recycled设置,
            for (cl = p->busy; cl; cl = cl->next) {
                cl->buf->recycled = 0;
            }
//首先发送p->out.
            if (p->out) {
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe write downstream flush out");
//取消掉recycled设置,因为已经不要回收这个buf了(后端不会有数据过来)。
                for (cl = p->out; cl; cl = cl->next) {
                    cl->buf->recycled = 0;
                }
//调用filter函数
                rc = p->output_filter(p->output_ctx, p->out);
//如果发送失败,则设置downstream_error,并且回收chain
                if (rc == NGX_ERROR) {
                    p->downstream_error = 1;
                    return ngx_event_pipe_drain_chains(p);
                }

                p->out = NULL;
            }
//发送p->in
            if (p->in) {
                ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe write downstream flush in");
//取消recycled设置
                for (cl = p->in; cl; cl = cl->next) {
                    cl->buf->recycled = 0;
                }
//发送数据
                rc = p->output_filter(p->output_ctx, p->in);
//同上
                if (rc == NGX_ERROR) {
                    p->downstream_error = 1;
                    return ngx_event_pipe_drain_chains(p);
                }

                p->in = NULL;
            }
//cache相关设置
            if (p->cacheable && p->buf_to_file) {

                file.buf = p->buf_to_file;
                file.next = NULL;

                if (ngx_write_chain_to_temp_file(p->temp_file, &file)
                    == NGX_ERROR)
                {
                    return NGX_ABORT;
                }
            }

            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                           "pipe write downstream done");

            /* TODO: free unused bufs */

            p->downstream_done = 1;
            break;
        }

紧接着是第二部分,这部分主要是处理upstream的数据并没有发送完全,此时nginx会尽量发送最大的可能的数据到client。

这里busy就保存了上次发送没有发送完毕的chain,它主要是为了方便统计。

它的步骤是这样子的,首先会计算busy chain的大小,因为我们有busy chain的限制(有busy buf的命令).然后计算p->in/p->out,最后得到一个最大的chain,然后发送。这里要注意,我们发送的每个buf大小是不会大于busy buf的大小的。

//判断是否需要退出循环,这里比较关键的就是downstream->write->ready,当发送返回again,就会从这里退出
        if (downstream->data != p->output_ctx
            || !downstream->write->ready
            || downstream->write->delayed)
        {
            break;
        }

        prev = NULL;
        bsize = 0;
//首先计算busy的chain的大小
        for (cl = p->busy; cl; cl = cl->next) {

            if (cl->buf->recycled) {
//如果是相同的chain,则跳过计算
                if (prev == cl->buf->start) {
                    continue;
                }
//计算大小
                bsize += cl->buf->end - cl->buf->start;
                prev = cl->buf->start;
            }
        }

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                       "pipe write busy: %uz", bsize);

        out = NULL;
//如果bsize大于我们设置的busy buf size,则直接发送数据
        if (bsize >= (size_t) p->busy_size) {
            flush = 1;
            goto flush;
        }

        flush = 0;
        ll = NULL;
        prev_last_shadow = 1;
//然后开始处理p->out和p->in
        for ( ;; ) {
            if (p->out) {
                cl = p->out;
//计算将要发送的buf是否大于我们设置的busy_size,而cl->buf->last - cl->buf->pos是肯定小于等于busy_size.
                if (cl->buf->recycled
                    && bsize + cl->buf->last - cl->buf->pos > p->busy_size)
                {
//此时当前的cl就不能发送,所以设置flush,然后立即发送
                    flush = 1;
                    break;
                }

                p->out = p->out->next;
//将shadow buf 放到free_raw_bufs中,以便后面使用
                ngx_event_pipe_free_shadow_raw_buf(&p->free_raw_bufs, cl->buf);

            } else if (!p->cacheable && p->in) {
                cl = p->in;

                ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
                               "pipe write buf ls:%d %p %z",
                               cl->buf->last_shadow,
                               cl->buf->pos,
                               cl->buf->last - cl->buf->pos);
//类似上面的,也是需要计算buf大小,这里可以看到我们只操作影子(shadow)buf,
                if (cl->buf->recycled
                    && cl->buf->last_shadow
                    && bsize + cl->buf->last - cl->buf->pos > p->busy_size)
                {
                    if (!prev_last_shadow) {
//设置out chain
                        p->in = p->in->next;

                        cl->next = NULL;

                        if (out) {
                            *ll = cl;
                        } else {
                            out = cl;
                        }
                    }

                    flush = 1;
                    break;
                }

                prev_last_shadow = cl->buf->last_shadow;

                p->in = p->in->next;

            } else {
                break;
            }
//如果cl是recycled,则说明这个buf会被发送,因此bsize更新
            if (cl->buf->recycled) {
                bsize += cl->buf->last - cl->buf->pos;
            }

            cl->next = NULL;
//将对应的chain绑定到out上,接下来就会发送out。
            if (out) {
                *ll = cl;
            } else {
                out = cl;
            }
            ll = &cl->next;
        }

然后是最后一部分,也就是发送chain,然后update chain的操作(类似chain output的操作)

这里用有一个很重要的操作,通过上面的分析我们知道,在upstream中,基本上每个buf都会有一个shadow,而我们发送的时候,用的是shadow(last_shadow=1),当发送完毕,这个buf会被放到p->free中,此时还有一个buf,那就是原始的buf(这个buf是有分配内存的,b->start不为null),因此这里就可以重用这个buf,在nginx中会将发送完毕后的这个buf放到free_raw_buf中。

    flush:
........................................
//发送数据到client
        rc = p->output_filter(p->output_ctx, out);
//错误的话,设置downstream_error
        if (rc == NGX_ERROR) {
            p->downstream_error = 1;
            return ngx_event_pipe_drain_chains(p);
        }
//update chain,将已经完全发送的chain保存到free,还没发送的保存到busy.
        ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag);
//遍历free chain,
        for (cl = p->free; cl; cl = cl->next) {

            if (cl->buf->temp_file) {
                if (p->cacheable || !p->cyclic_temp_file) {
                    continue;
                }

                /* reset p->temp_offset if all bufs had been sent */

                if (cl->buf->file_last == p->temp_file->offset) {
                    p->temp_file->offset = 0;
                }
            }

            /* TODO: free buf if p->free_bufs && upstream done */

            /* add the free shadow raw buf to p->free_raw_bufs */
//将shadow的buf放到p->free_raw_buf中.
            if (cl->buf->last_shadow) {
//可以看到这里操作的是cl->buf->shadow,也就是我们在p->input_filter中,拷贝的那个原始buf。
                if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
                    return NGX_ABORT;
                }

                cl->buf->last_shadow = 0;
            }

            cl->buf->shadow = NULL;
        }

上面的分析,还遗漏了一个函数,那就是ngx_event_pipe_drain_chains,这个函数被调用,说明client出错,此时则需要将对应的shadow buf放到free_raw_buf中(调用(ngx_event_pipe_add_free_buf).

最后还有一个问题没解决,那就是当client断开连接时(当接收完毕所有的header),nginx如何来处理,通过上面的代码,我们能够看到,当发送数据失败时,只是调用ngx_event_pipe_drain_chains然后返回,那么其实这里处理和nginx处理一般的http请求是一样的,那就是暂时忽略client的断开,也就是不管client的状态,当upstream发送完毕,才会认为request处理完成,这样子能简化代码的处理。

并且,这里只有cache没有打开的时候,才会finalize 当前的request,这是因为当cache打开的时候,当前的request 是需要被cache,然后下次请求再次到达,就可以发送cache的了。所以此时需要接受完后端所有的数据。

下面我们就来看这部分代码。

static void
ngx_http_upstream_process_request(ngx_http_request_t *r)
{
    ngx_uint_t            del;
    ngx_temp_file_t      *tf;
    ngx_event_pipe_t     *p;
    ngx_http_upstream_t  *u;

    u = r->upstream;
    p = u->pipe;
//当finalize upstream之后,connection将会被赋值为NULL
    if (u->peer.connection) {

        if (u->store) {

            del = p->upstream_error;

            tf = u->pipe->temp_file;

            if (p->upstream_eof || p->upstream_done) {

                if (u->headers_in.status_n == NGX_HTTP_OK
                    && (u->headers_in.content_length_n == -1
                        || (u->headers_in.content_length_n == tf->offset)))
                {
                    ngx_http_upstream_store(r, u);

                } else {
                    del = 1;
                }
            }

            if (del && tf->file.fd != NGX_INVALID_FILE) {

                if (ngx_delete_file(tf->file.name.data) == NGX_FILE_ERROR) {

                    ngx_log_error(NGX_LOG_CRIT, r->connection->log, ngx_errno,
                                  ngx_delete_file_n " "%s" failed",
                                  u->pipe->temp_file->file.name.data);
                }
            }
        }
//如果upstream端发送完毕(断开等),则finalize request。
        if (p->upstream_done || p->upstream_eof || p->upstream_error) {
            ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                           "http upstream exit: %p", p->out);
            ngx_http_upstream_finalize_request(r, u, 0);
            return;
        }
    }
//如果downstream error设置,则进入下面的处理
    if (p->downstream_error) {
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http upstream downstream error");
//这里可以看到如果cache没有打开,则finalize 当前的request
        if (!u->cacheable && !u->store && u->peer.connection) {
            ngx_http_upstream_finalize_request(r, u, 0);
        }
    }
}

Related posts:

  1. nginx中upstream的设计和实现(四)
  2. nginx中upstream的设计和实现(二)
  3. nginx中upstream的设计和实现(三)

转载请注明:爱开源 » nginx中upstream的设计和实现(五)

您必须 登录 才能发表评论!