Nginx缓存详解

1. 相关配置

1.1 配置指令

Nginx缓存由proxy_cache_path指令开启

1
proxy_cache_path D:\output levels=1:2 keys_zone=my_cache:10m max_size=2g inactive=60m use_temp_path=off;

对于每个参数的具体含义可以参考nginx官方文档,对于缓存文件名则需要proxy_cache_key指令指定

1
proxy_cache my_cache;

对于该指令的具体用法可以参考nginx官方文档

1
proxy_cache_valid 200 1h;

需要注意的是,如果没有这条指令,nginx将不会缓存上游的数据。对于该指令的具体用法可以参考nginx官方文档

2. 源码解析

Nginx与文件缓存相关的代码在src/http/ngx_http_file_cache.c

2.1 关键结构体

与缓存相关的结构体有ngx_path_tngx_http_file_cache_sh_tngx_http_file_cache_sngx_http_file_cache_node_tngx_http_file_cache_header_t,来看看这几个结构体的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158

typedef struct {
ngx_str_t name;
size_t len;
size_t level[3];

ngx_path_manager_pt manager; //决定是否启用cache manager进程
ngx_path_loader_pt loader; //决定是否启用cache loader进程
void *data;

u_char *conf_file; //nginx配置文件路径
ngx_uint_t line;
} ngx_path_t;

typedef struct {
ngx_rbtree_t rbtree;
ngx_rbtree_node_t sentinel;
ngx_queue_t queue;
ngx_atomic_t cold;
ngx_atomic_t loading;
off_t size;
ngx_uint_t count;
ngx_uint_t watermark;
} ngx_http_file_cache_sh_t;

struct ngx_http_file_cache_s {
ngx_http_file_cache_sh_t *sh;
ngx_slab_pool_t *shpool; /* shpool是用于管理共享内存的 slab allocator ,所有缓存节点占用空间都由它进行分配 */

ngx_path_t *path; /* ngx_http_file_cache_set_slot中创建ngx_path_t空间 */

off_t max_size; //缓存目录能保存的缓存最大值
size_t bsize;

time_t inactive; /* 触发LRU算法阈值 */

time_t fail_time;

ngx_uint_t files;
ngx_uint_t loader_files; /* 阈值,当load的文件个数大于这个值之后,load进程会短暂的休眠(时间位loader_sleep) */
ngx_msec_t last;
ngx_msec_t loader_sleep; /* 阈值,执行一次缓存文件加载到共享内存后 , 进程的休眠时间 , 默认200 */
ngx_msec_t loader_threshold;

ngx_uint_t manager_files;
ngx_msec_t manager_sleep; /* 阈值,处理一定数量缓存结点后, 进程的休眠时间 , 默认200 */
ngx_msec_t manager_threshold;

ngx_shm_zone_t *shm_zone; /* 共享内存区 */

ngx_uint_t use_temp_path; /* 是否使用临时目录 */
/* unsigned use_temp_path:1 */
};

typedef struct {
ngx_rbtree_node_t node; /* 缓存查询树的节点 */
ngx_queue_t queue; /* LRU页面置换算法 队列中的节点 */

u_char key[NGX_HTTP_CACHE_KEY_LEN
- sizeof(ngx_rbtree_key_t)];

unsigned count:20;
unsigned uses:10; //缓存使用次数
unsigned valid_msec:10;
unsigned error:10;
unsigned exists:1;
unsigned updating:1; // 缓存分片更新标志位
unsigned deleting:1; // 缓存分片删除标志位
unsigned purged:1;
/* 10 unused bits */

ngx_file_uniq_t uniq;
time_t expire; //过期时间
time_t valid_sec;
size_t body_start;
off_t fs_size;
ngx_msec_t lock_time;
} ngx_http_file_cache_node_t;

//在写缓存文件时会将这个结构体写入
typedef struct {
ngx_uint_t version;
time_t valid_sec;
time_t updating_sec;
time_t error_sec;
time_t last_modified;
time_t date;
uint32_t crc32;
u_short valid_msec;
u_short header_start; /* 缓存文件中http头开始的偏移 */
u_short body_start;
u_char etag_len;
u_char etag[NGX_HTTP_CACHE_ETAG_LEN];
u_char vary_len;
u_char vary[NGX_HTTP_CACHE_VARY_LEN];
u_char variant[NGX_HTTP_CACHE_KEY_LEN];
} ngx_http_file_cache_header_t;

struct ngx_http_cache_s {
ngx_file_t file; /* 缓存文件描述结构体 */
ngx_array_t keys; /* 存放proxy_cache_key指令的值 */
uint32_t crc32;
u_char key[NGX_HTTP_CACHE_KEY_LEN]; /* 存放计算md5后的值 */
u_char main[NGX_HTTP_CACHE_KEY_LEN]; /* 跟key相同 */

ngx_file_uniq_t uniq;
time_t valid_sec;
time_t updating_sec;
time_t error_sec;
time_t last_modified;
time_t date;

ngx_str_t etag;
ngx_str_t vary;
u_char variant[NGX_HTTP_CACHE_KEY_LEN];

size_t header_start; /* http头在缓存中的偏移位置 */
size_t body_start; /* http响应体在缓存中的偏移位置 */
off_t length; /* 缓存文件的大小,见ngx_http_file_cache_open */
off_t fs_size;

ngx_uint_t min_uses;
ngx_uint_t error;
ngx_uint_t valid_msec;
ngx_uint_t vary_tag;

ngx_buf_t *buf; /* 存储缓存文件头 */

ngx_http_file_cache_t *file_cache;
ngx_http_file_cache_node_t *node; //ngx_http_file_cache_exists中创建空间和赋值

#if (NGX_THREADS || NGX_COMPAT)
ngx_thread_task_t *thread_task;
#endif

ngx_msec_t lock_timeout;
ngx_msec_t lock_age;
ngx_msec_t lock_time;
ngx_msec_t wait_time;

ngx_event_t wait_event;

unsigned lock:1;
unsigned waiting:1;

unsigned updated:1;
unsigned updating:1;
unsigned exists:1;
unsigned temp_file:1;
unsigned purged:1;
unsigned reading:1;
unsigned secondary:1;
unsigned background:1;

unsigned stale_updating:1;
unsigned stale_error:1;
};

部分字段含义见注释

2.2 生成标记缓存文件的key

生成标记缓存文件的key由ngx_http_file_cache_create_key函数实现,我们来看看具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

void
ngx_http_file_cache_create_key(ngx_http_request_t *r)
{
size_t len;
ngx_str_t *key;
ngx_uint_t i;
ngx_md5_t md5;
ngx_http_cache_t *c;

c = r->cache;

len = 0;

ngx_crc32_init(c->crc32);
ngx_md5_init(&md5);

key = c->keys.elts;
for (i = 0; i < c->keys.nelts; i++) {
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http cache key: \"%V\"", &key[i]);

len += key[i].len;

ngx_crc32_update(&c->crc32, key[i].data, key[i].len);
ngx_md5_update(&md5, key[i].data, key[i].len);
}

c->header_start = sizeof(ngx_http_file_cache_header_t)
+ sizeof(ngx_http_file_cache_key) + len + 1;

ngx_crc32_final(c->crc32);
ngx_md5_final(c->key, &md5);

ngx_memcpy(c->main, c->key, NGX_HTTP_CACHE_KEY_LEN);
}

该函数实现较简单,主要是计算proxy_cache_key指令值的md5值,并保存,然后初始化header_start成员的值,这个地方需要注意一下缓存文件头部信息
[ngx_http_file_cache_header_t][“\nKEY: “][orig_key][“\n”][header][body]

2.3 缓存文件名生成

生成缓存文件名主要由ngx_http_file_cache_name实现,现在来看看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

static ngx_int_t
ngx_http_file_cache_name(ngx_http_request_t *r, ngx_path_t *path)
{
u_char *p;
ngx_http_cache_t *c;

c = r->cache;

if (c->file.name.len) {
return NGX_OK;
}

c->file.name.len = path->name.len + 1 + path->len
+ 2 * NGX_HTTP_CACHE_KEY_LEN;

c->file.name.data = ngx_pnalloc(r->pool, c->file.name.len + 1);
if (c->file.name.data == NULL) {
return NGX_ERROR;
}

ngx_memcpy(c->file.name.data, path->name.data, path->name.len);

p = c->file.name.data + path->name.len + 1 + path->len;
p = ngx_hex_dump(p, c->key, NGX_HTTP_CACHE_KEY_LEN);
*p = '\0';

ngx_create_hashed_filename(path, c->file.name.data, c->file.name.len);

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"cache file: \"%s\"", c->file.name.data);

return NGX_OK;
}

  1. 该函数首先判断当前缓存文件名是否已经生成成功,若已生成,则直接返回,代码如下:
1
2
3
if (c->file.name.len) {
return NGX_OK;
}
  1. 接下来计算缓存文件名长度,其中path->name.lenproxy_cache_path指令第一个参数,path->len为levels长度,比如level=1:2则path->len为5,包含两个/,代码如下:
1
2
c->file.name.len = path->name.len + 1 + path->len
+ 2 * NGX_HTTP_CACHE_KEY_LEN;
  1. 接着为缓存文件名申请内存,代码如下:
1
c->file.name.data = ngx_pnalloc(r->pool, c->file.name.len + 1);
  1. 接着将proxy_cache_path指令设置的路径复制到c->file.name.data最前端,完成后c->file.name.data的值为:D:\output,代码如下:
1
ngx_memcpy(c->file.name.data, path->name.data, path->name.len);
  1. 接着将p指向c->file.name.data偏移 path->name.len + 1 + path->len的位置处,这样做的目的是准备生成32位长的md5文件名,并预留level设置的目录
1
2
3
p = c->file.name.data + path->name.len + 1 + path->len;
p = ngx_hex_dump(p, c->key, NGX_HTTP_CACHE_KEY_LEN);
*p = '\0';

这一步完成之后c->file.name.data的值为D:\output屯屯屯md5(proxy_cache_key)

  1. 调用ngx_create_hashed_filename函数补全第5步预留的level目录
1
ngx_create_hashed_filename(path, c->file.name.data, c->file.name.len);

2.4 生成由levels参数指定的目录层级

生成由levels参数指定的目录层级由ngx_create_hashed_filename实现,现在来看看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void
ngx_create_hashed_filename(ngx_path_t *path, u_char *file, size_t len)
{
size_t i, level;
ngx_uint_t n;

i = path->name.len + 1;

file[path->name.len + path->len] = '/';

for (n = 0; n < NGX_MAX_PATH_LEVEL; n++) {
level = path->level[n];

if (level == 0) {
break;
}

len -= level;
file[i - 1] = '/';
ngx_memcpy(&file[i], &file[len], level);
i += level + 1;
}
}
  1. 从上一节中我们知道参数file的值为D:\output屯屯屯md5(proxy_cache_key),该函数首先用一个变量i保存path长度加1,这个设计非常巧妙,在后续中会使用到,代码如下:
1
i = path->name.len + 1;
  1. 接着修改file的值,在层级目录后添加一个反斜杠,修改后为D:\output屯屯屯/md5(proxy_cache_key)
1
file[path->name.len + path->len]  = '/';
  1. 接下来进入一个for循环,填充file中的屯屯屯,在第一次循环中首先获取levels=1:2中的1,接着用变量len减去变量level,接着在output后添加一个’/‘,然后将file中从len位置复制1个字符到output/后,修改i的值;第二次循环,首先获取levels=1:2中的2,接着用len减去2,接着在第一次缓存复制的字符后添加一个’/‘,然后把file从len处复制2个字符到上一步的’/‘后,修改i的值;第三次跳出循环,至此填充完成,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
for (n = 0; n < NGX_MAX_PATH_LEVEL; n++) {
level = path->level[n];

if (level == 0) {
break;
}

len -= level;
file[i - 1] = '/';
ngx_memcpy(&file[i], &file[len], level);
i += level + 1;
}

2.5 从红黑树中查找缓存节点

缓存key跟缓存文件名生成好之后,紧接着根据生成好的key从红黑树中查找,若不存在则插入,找到则返回对应的缓存节点,这个功能由ngx_http_file_cache_exists函数实现,我们看看具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

static ngx_int_t
ngx_http_file_cache_exists(ngx_http_file_cache_t *cache, ngx_http_cache_t *c)
{
ngx_int_t rc;
ngx_http_file_cache_node_t *fcn;

ngx_shmtx_lock(&cache->shpool->mutex);

fcn = c->node;//后面没找到则会创建node节点

if (fcn == NULL) {
fcn = ngx_http_file_cache_lookup(cache, c->key); //以 c->key 为查找条件从缓存中查找缓存节点:
}

if (fcn) { //cache中存在该key
ngx_queue_remove(&fcn->queue);

//该客户端在新建连接后,如果之前有缓存该文件,则c->node为NULL,表示这个连接请求第一次走到这里,有一个客户端在获取数据,如果在
//连接范围内(还没有断开连接)多次获取该缓存文件,则也只会加1,表示当前有多少个客户端连接在获取该缓存
if (c->node == NULL) { //如果该请求第一次使用此缓存节点,则增加相关引用和使用次数
fcn->uses++;
fcn->count++;
}

if (fcn->error) {

if (fcn->valid_sec < ngx_time()) {
goto renew; //缓存已过期
}

rc = NGX_OK;

goto done;
}

if (fcn->exists || fcn->uses >= c->min_uses) { //该请求的缓存已经存在,并且对该缓存的请求次数达到了最低要求次数min_uses
//表示该缓存文件是否存在,Proxy_cache_min_uses 3,则第3次后开始获取后端数据,获取完毕后在ngx_http_file_cache_update中置1,但是只有在地4次请求的时候才会在ngx_http_file_cache_exists赋值为1
c->exists = fcn->exists;
if (fcn->body_start) {
c->body_start = fcn->body_start;
}

rc = NGX_OK;

goto done;
}

//例如配置Proxy_cache_min_uses 5,则需要客户端请求5才才能从缓存中取,如果现在只有4次,则都需要从后端获取数据
rc = NGX_AGAIN;

goto done;
}

//没找到,则在下面创建node节点,添加到ngx_http_file_cache_t->sh->rbtree红黑树中
fcn = ngx_slab_calloc_locked(cache->shpool,
sizeof(ngx_http_file_cache_node_t));
if (fcn == NULL) {
ngx_shmtx_unlock(&cache->shpool->mutex);

(void) ngx_http_file_cache_forced_expire(cache);

ngx_shmtx_lock(&cache->shpool->mutex);

fcn = ngx_slab_calloc_locked(cache->shpool,
sizeof(ngx_http_file_cache_node_t));
if (fcn == NULL) {
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0,
"could not allocate node%s", cache->shpool->log_ctx);
rc = NGX_ERROR;
goto failed;
}
}

ngx_memcpy((u_char *) &fcn->node.key, c->key, sizeof(ngx_rbtree_key_t));

ngx_memcpy(fcn->key, &c->key[sizeof(ngx_rbtree_key_t)],
NGX_HTTP_CACHE_KEY_LEN - sizeof(ngx_rbtree_key_t));

ngx_rbtree_insert(&cache->sh->rbtree, &fcn->node); //把该节点添加到红黑树中

fcn->uses = 1;
fcn->count = 1;

renew:

rc = NGX_DECLINED; //uri第一次请求的时候创建node节点,同时返回NGX_DECLINED。或者缓存过期需要把该节点相关信息恢复为默认值

fcn->valid_msec = 0;
fcn->error = 0;
fcn->exists = 0;
fcn->valid_sec = 0;
fcn->uniq = 0;
fcn->body_start = 0;
fcn->fs_size = 0;

done:

fcn->expire = ngx_time() + cache->inactive;

ngx_queue_insert_head(&cache->sh->queue, &fcn->queue); //新创建的node节点添加到cache->sh->queue头部

c->uniq = fcn->uniq;//文件的uniq 赋值见ngx_http_file_cache_update
c->error = fcn->error;
c->node = fcn; //把新创建的fcn赋值给c->node

failed:

ngx_shmtx_unlock(&cache->shpool->mutex);

return rc;
}

2.6 打开缓存文件

上一节中介绍了如何从红黑树中查找缓存节点,找到缓存节点之后,就要需要根据缓存节点中的缓存文件路径去打开缓存文件了,Nginx使用ngx_http_file_cache_open函数实现,接下来我们来看看实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
ngx_int_t
ngx_http_file_cache_open(ngx_http_request_t *r)
{
ngx_int_t rc, rv;
ngx_uint_t test;
ngx_http_cache_t *c;
ngx_pool_cleanup_t *cln;
ngx_open_file_info_t of;
ngx_http_file_cache_t *cache;
ngx_http_core_loc_conf_t *clcf;

c = r->cache;

if (c->waiting) {
return NGX_AGAIN;
}

if (c->reading) {
return ngx_http_file_cache_read(r, c);
}

cache = c->file_cache;

if (c->node == NULL) {
cln = ngx_pool_cleanup_add(r->pool, 0);
if (cln == NULL) {
return NGX_ERROR;
}

cln->handler = ngx_http_file_cache_cleanup;
cln->data = c;
}

rc = ngx_http_file_cache_exists(cache, c);

ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http file cache exists: %i e:%d", rc, c->exists);

if (rc == NGX_ERROR) {
return rc;
}

if (rc == NGX_AGAIN) {
return NGX_HTTP_CACHE_SCARCE;
}

if (rc == NGX_OK) {

if (c->error) {
return c->error;
}

c->temp_file = 1;
test = c->exists ? 1 : 0;
rv = NGX_DECLINED;

} else { /* rc == NGX_DECLINED */

test = cache->sh->cold ? 1 : 0;

if (c->min_uses > 1) {

if (!test) {
return NGX_HTTP_CACHE_SCARCE;
}

rv = NGX_HTTP_CACHE_SCARCE;

} else {
c->temp_file = 1;
rv = NGX_DECLINED;
}
}

if (ngx_http_file_cache_name(r, cache->path) != NGX_OK) {
return NGX_ERROR;
}

if (!test) {
goto done;
}

clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

ngx_memzero(&of, sizeof(ngx_open_file_info_t));

of.uniq = c->uniq;
of.valid = clcf->open_file_cache_valid;
of.min_uses = clcf->open_file_cache_min_uses;
of.events = clcf->open_file_cache_events;
of.directio = NGX_OPEN_FILE_DIRECTIO_OFF;
of.read_ahead = clcf->read_ahead;

if (ngx_open_cached_file(clcf->open_file_cache, &c->file.name, &of, r->pool)
!= NGX_OK)
{
switch (of.err) {

case 0:
return NGX_ERROR;

case NGX_ENOENT:
case NGX_ENOTDIR:
goto done;

default:
ngx_log_error(NGX_LOG_CRIT, r->connection->log, of.err,
ngx_open_file_n " \"%s\" failed", c->file.name.data);
return NGX_ERROR;
}
}

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http file cache fd: %d", of.fd);

c->file.fd = of.fd;
c->file.log = r->connection->log;
c->uniq = of.uniq;
c->length = of.size;
c->fs_size = (of.fs_size + cache->bsize - 1) / cache->bsize;

c->buf = ngx_create_temp_buf(r->pool, c->body_start);
if (c->buf == NULL) {
return NGX_ERROR;
}

return ngx_http_file_cache_read(r, c);

done:

if (rv == NGX_DECLINED) {
return ngx_http_file_cache_lock(r, c);
}

return rv;

}

2.7 读缓存文件

若缓存文件存在,则会读取缓存文件头部,并根据读取出的头部信息进行下一步操作,nginx使用ngx_http_file_cache_read函数实现这个功能,我们来看一下具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
static ngx_int_t
ngx_http_file_cache_read(ngx_http_request_t *r, ngx_http_cache_t *c)
{
u_char *p;
time_t now;
ssize_t n;
ngx_str_t *key;
ngx_int_t rc;
ngx_uint_t i;
ngx_http_file_cache_t *cache;
ngx_http_file_cache_header_t *h;

n = ngx_http_file_cache_aio_read(r, c);

if (n < 0) {
return n;
}

//写缓冲区封装过程参考:ngx_http_upstream_process_header
//缓存文件中前面部分格式:[ngx_http_file_cache_header_t]["\nKEY: "][orig_key]["\n"][header]

if ((size_t) n < c->header_start) {
ngx_log_error(NGX_LOG_CRIT, r->connection->log, 0,
"cache file \"%s\" is too small", c->file.name.data);
return NGX_DECLINED;
}

h = (ngx_http_file_cache_header_t *) c->buf->pos;

if (h->version != NGX_HTTP_CACHE_VERSION) {
ngx_log_error(NGX_LOG_INFO, r->connection->log, 0,
"cache file \"%s\" version mismatch", c->file.name.data);
return NGX_DECLINED; //如果返回这个NGX_DECLINED,会把cached置0,返回出去后只有从后端从新获取数据
}

if (h->crc32 != c->crc32 || (size_t) h->header_start != c->header_start) {
ngx_log_error(NGX_LOG_CRIT, r->connection->log, 0,
"cache file \"%s\" has md5 collision", c->file.name.data);
return NGX_DECLINED;
}

p = c->buf->pos + sizeof(ngx_http_file_cache_header_t)
+ sizeof(ngx_http_file_cache_key);

key = c->keys.elts;
for (i = 0; i < c->keys.nelts; i++) {
if (ngx_memcmp(p, key[i].data, key[i].len) != 0) {
ngx_log_error(NGX_LOG_CRIT, r->connection->log, 0,
"cache file \"%s\" has md5 collision",
c->file.name.data);
return NGX_DECLINED;
}

p += key[i].len;
}

if ((size_t) h->body_start > c->body_start) {
ngx_log_error(NGX_LOG_CRIT, r->connection->log, 0,
"cache file \"%s\" has too long header",
c->file.name.data);
return NGX_DECLINED;
}

if (h->vary_len > NGX_HTTP_CACHE_VARY_LEN) {
ngx_log_error(NGX_LOG_CRIT, r->connection->log, 0,
"cache file \"%s\" has incorrect vary length",
c->file.name.data);
return NGX_DECLINED;
}

if (h->vary_len) {
ngx_http_file_cache_vary(r, h->vary, h->vary_len, c->variant);

if (ngx_memcmp(c->variant, h->variant, NGX_HTTP_CACHE_KEY_LEN) != 0) {
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http file cache vary mismatch");
return ngx_http_file_cache_reopen(r, c);
}
}

c->buf->last += n;

c->valid_sec = h->valid_sec;
c->updating_sec = h->updating_sec;
c->error_sec = h->error_sec;
c->last_modified = h->last_modified;
c->date = h->date;
c->valid_msec = h->valid_msec;
c->body_start = h->body_start;
c->etag.len = h->etag_len;
c->etag.data = h->etag;

r->cached = 1;

cache = c->file_cache;

if (cache->sh->cold) {

ngx_shmtx_lock(&cache->shpool->mutex);

if (!c->node->exists) {
c->node->uses = 1;
c->node->body_start = c->body_start;
c->node->exists = 1;
c->node->uniq = c->uniq;
c->node->fs_size = c->fs_size;

cache->sh->size += c->fs_size;
}

ngx_shmtx_unlock(&cache->shpool->mutex);
}

now = ngx_time();

if (c->valid_sec < now) {
c->stale_updating = c->valid_sec + c->updating_sec >= now;
c->stale_error = c->valid_sec + c->error_sec >= now;

ngx_shmtx_lock(&cache->shpool->mutex);

if (c->node->updating) {
rc = NGX_HTTP_CACHE_UPDATING;

} else {
c->node->updating = 1;
c->updating = 1;
c->lock_time = c->node->lock_time;
rc = NGX_HTTP_CACHE_STALE;
}

ngx_shmtx_unlock(&cache->shpool->mutex);

ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http file cache expired: %i %T %T",
rc, c->valid_sec, now);

return rc;
}

return NGX_OK;
}


  1. 调用ngx_http_file_cache_aio_read函数读取缓存文件,读取大小为c->header_start,读取完成后,对返回值进行校验,确定是否读取成功,代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    n = ngx_http_file_cache_aio_read(r, c);

    if (n < 0) {
    return n;
    }

    if ((size_t) n < c->header_start) {
    ngx_log_error(NGX_LOG_CRIT, r->connection->log, 0,
    "cache file \"%s\" is too small", c->file.name.data);
    return NGX_DECLINED;
    }

  1. 取出缓存文件头部,并对其字段进行校验,封包过程参考ngx_http_file_cache_set_header函数
    1
    2
    3

    h = (ngx_http_file_cache_header_t *) c->buf->pos;

  2. 使用读出的缓存文件头部,更新内存中缓存节点信息
1
2
3
4
5
6
7
8
9
10
11
12
13

c->buf->last += n; //buf后续会被用来存放http头,所以这个地方移动last指针

c->valid_sec = h->valid_sec;
c->updating_sec = h->updating_sec;
c->error_sec = h->error_sec;
c->last_modified = h->last_modified;
c->date = h->date;
c->valid_msec = h->valid_msec;
c->body_start = h->body_start;
c->etag.len = h->etag_len;
c->etag.data = h->etag;

2.8 发送缓存

若2.7节读取缓存文件成功,而且缓存校验也成功,则开始发送缓存内容,该功能由ngx_http_upstream_cache_send函数实现,接下来我们看看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
static ngx_int_t
ngx_http_upstream_cache_send(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ngx_int_t rc;
ngx_http_cache_t *c;

r->cached = 1;
c = r->cache;

if (c->header_start == c->body_start) {
r->http_version = NGX_HTTP_VERSION_9;
return ngx_http_cache_send(r);
}

/* TODO: cache stack */

u->buffer = *c->buf;
u->buffer.pos += c->header_start;

ngx_memzero(&u->headers_in, sizeof(ngx_http_upstream_headers_in_t));
u->headers_in.content_length_n = -1;
u->headers_in.last_modified_time = -1;

if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
sizeof(ngx_table_elt_t))
!= NGX_OK)
{
return NGX_ERROR;
}

if (ngx_list_init(&u->headers_in.trailers, r->pool, 2,
sizeof(ngx_table_elt_t))
!= NGX_OK)
{
return NGX_ERROR;
}

rc = u->process_header(r);

if (rc == NGX_OK) {

if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
return NGX_DONE;
}

return ngx_http_cache_send(r);
}

if (rc == NGX_ERROR) {
return NGX_ERROR;
}

if (rc == NGX_AGAIN) {
rc = NGX_HTTP_UPSTREAM_INVALID_HEADER;
}

/* rc == NGX_HTTP_UPSTREAM_INVALID_HEADER */

ngx_log_error(NGX_LOG_CRIT, r->connection->log, 0,
"cache file \"%s\" contains invalid header",
c->file.name.data);

/* TODO: delete file */

return rc;
}

2.9 向上游回源

若缓存不存在,nginx则向上游回源,那么整个流程就走到ngx_http_upstream_init_request函数的#if (NGX_HTTP_CACHE)块后,我们来看看其源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269

static void
ngx_http_upstream_init_request(ngx_http_request_t *r)
{
ngx_str_t *host;
ngx_uint_t i;
ngx_resolver_ctx_t *ctx, temp;
ngx_http_cleanup_t *cln;
ngx_http_upstream_t *u;
ngx_http_core_loc_conf_t *clcf;
ngx_http_upstream_srv_conf_t *uscf, **uscfp;
ngx_http_upstream_main_conf_t *umcf;

if (r->aio) {
return;
}

u = r->upstream;

#if (NGX_HTTP_CACHE)

if (u->conf->cache) {
ngx_int_t rc;

rc = ngx_http_upstream_cache(r, u);

if (rc == NGX_BUSY) {
r->write_event_handler = ngx_http_upstream_init_request;
return;
}

r->write_event_handler = ngx_http_request_empty_handler;

if (rc == NGX_ERROR) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

if (rc == NGX_OK) {
rc = ngx_http_upstream_cache_send(r, u);

if (rc == NGX_DONE) {
return;
}

if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
rc = NGX_DECLINED;
r->cached = 0;
u->buffer.start = NULL;
u->cache_status = NGX_HTTP_CACHE_MISS;
u->request_sent = 1;
}

if (ngx_http_upstream_cache_background_update(r, u) != NGX_OK) {
rc = NGX_ERROR;
}
}

if (rc != NGX_DECLINED) {
ngx_http_finalize_request(r, rc);
return;
}
}

#endif

u->store = u->conf->store;
/*
*设置Nginx与下游客户端之间TCP连接的检查方法
*实际上,这两个方法都会通过ngx_http_upstream_check_broken_connection方法检查Nginx与下游的连接是否正常,如果出现错误,就会立即终止连接。
*/
if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
//注意这时候的r还是客户端的连接,与上游服务器的连接r还没有建立
r->read_event_handler = ngx_http_upstream_rd_check_broken_connection;
r->write_event_handler = ngx_http_upstream_wr_check_broken_connection;
}
//有接收到客户端包体,则把包体结构赋值给u->request_bufs,在后面的if (u->create_request(r) != NGX_OK) {会用到
if (r->request_body) {
u->request_bufs = r->request_body->bufs;
}

if (u->create_request(r) != NGX_OK) { //ngx_http_proxy_create_request
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

if (ngx_http_upstream_set_local(r, u, u->conf->local) != NGX_OK) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

if (u->conf->socket_keepalive) {
u->peer.so_keepalive = 1;
}

clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
/* 初始化ngx_http_upstream_t结构中成员output向下游发送响应的方式 */
u->output.alignment = clcf->directio_alignment;
u->output.pool = r->pool;
u->output.bufs.num = 1;
u->output.bufs.size = clcf->client_body_buffer_size;

if (u->output.output_filter == NULL) {
u->output.output_filter = ngx_chain_writer;
u->output.filter_ctx = &u->writer;
}

u->writer.pool = r->pool;
/* 添加用于表示上游响应的状态,例如:错误编码、包体长度等 */
if (r->upstream_states == NULL) {

r->upstream_states = ngx_array_create(r->pool, 1,
sizeof(ngx_http_upstream_state_t));
if (r->upstream_states == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

} else {

u->state = ngx_array_push(r->upstream_states);
if (u->state == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
}

cln = ngx_http_cleanup_add(r, 0);
if (cln == NULL) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

cln->handler = ngx_http_upstream_cleanup; //当请求结束时,一定会调用ngx_http_upstream_cleanup方法
cln->data = r;
u->cleanup = &cln->handler;

if (u->resolved == NULL) {

uscf = u->conf->upstream;

} else {

#if (NGX_HTTP_SSL)
u->ssl_name = u->resolved->host;
#endif

host = &u->resolved->host;

umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);

uscfp = umcf->upstreams.elts;

for (i = 0; i < umcf->upstreams.nelts; i++) {

uscf = uscfp[i];

if (uscf->host.len == host->len
&& ((uscf->port == 0 && u->resolved->no_port)
|| uscf->port == u->resolved->port)
&& ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
{
goto found;
}
}

if (u->resolved->sockaddr) {

if (u->resolved->port == 0
&& u->resolved->sockaddr->sa_family != AF_UNIX)
{
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no port in upstream \"%V\"", host);
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

if (ngx_http_upstream_create_round_robin_peer(r, u->resolved)
!= NGX_OK)
{
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

ngx_http_upstream_connect(r, u);

return;
}

if (u->resolved->port == 0) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no port in upstream \"%V\"", host);
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

temp.name = *host;
// 初始化域名解析器
ctx = ngx_resolve_start(clcf->resolver, &temp);
if (ctx == NULL) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

if (ctx == NGX_NO_RESOLVER) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"no resolver defined to resolve %V", host);

ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY);
return;
}

ctx->name = *host;
ctx->handler = ngx_http_upstream_resolve_handler; //设置域名解析完成后的回调函数。
ctx->data = r;
ctx->timeout = clcf->resolver_timeout;

u->resolved->ctx = ctx;
//开始域名解析,没有完成也会返回的。
if (ngx_resolve_name(ctx) != NGX_OK) {
u->resolved->ctx = NULL;
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

return; // 域名还没有解析完成,则直接返回
}

found:

if (uscf == NULL) {
ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
"no upstream configuration");
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

u->upstream = uscf;

#if (NGX_HTTP_SSL)
u->ssl_name = uscf->host;
#endif

if (uscf->peer.init(r, uscf) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u,
NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

u->peer.start_time = ngx_current_msec;

if (u->conf->next_upstream_tries
&& u->peer.tries > u->conf->next_upstream_tries)
{
u->peer.tries = u->conf->next_upstream_tries;
}

ngx_http_upstream_connect(r, u);
}

2.10 接受上游返回的数据

该功能由ngx_http_upstream_send_response函数实现,我们来看看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343

static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
ssize_t n;
ngx_int_t rc;
ngx_event_pipe_t *p;
ngx_connection_t *c;
ngx_http_core_loc_conf_t *clcf;

rc = ngx_http_send_header(r);

if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}

u->header_sent = 1;

if (u->upgrade) {

#if (NGX_HTTP_CACHE)

if (r->cache) {
ngx_http_file_cache_free(r->cache, u->pipe->temp_file);
}

#endif

ngx_http_upstream_upgrade(r, u);
return;
}

c = r->connection;

if (r->header_only) {

if (!u->buffering) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}

if (!u->cacheable && !u->store) {
ngx_http_upstream_finalize_request(r, u, rc);
return;
}

u->pipe->downstream_error = 1;
}

if (r->request_body && r->request_body->temp_file
&& r == r->main && !r->preserve_body
&& !u->conf->preserve_output)
{
ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
}

clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

if (!u->buffering) {

#if (NGX_HTTP_CACHE)

if (r->cache) {
ngx_http_file_cache_free(r->cache, u->pipe->temp_file);
}

#endif

if (u->input_filter == NULL) {
u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
u->input_filter = ngx_http_upstream_non_buffered_filter;
u->input_filter_ctx = r;
}

u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
r->write_event_handler =
ngx_http_upstream_process_non_buffered_downstream;

r->limit_rate = 0;

if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

if (clcf->tcp_nodelay && ngx_tcp_nodelay(c) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

n = u->buffer.last - u->buffer.pos;

if (n) {
u->buffer.last = u->buffer.pos;

u->state->response_length += n;

if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

ngx_http_upstream_process_non_buffered_downstream(r);

} else {
u->buffer.pos = u->buffer.start;
u->buffer.last = u->buffer.start;

if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

if (u->peer.connection->read->ready || u->length == 0) {
ngx_http_upstream_process_non_buffered_upstream(r, u);
}
}

return;
}

/* TODO: preallocate event_pipe bufs, look "Content-Length" */

#if (NGX_HTTP_CACHE)

if (r->cache && r->cache->file.fd != NGX_INVALID_FILE) {
ngx_pool_run_cleanup_file(r->pool, r->cache->file.fd);
r->cache->file.fd = NGX_INVALID_FILE;
}

switch (ngx_http_test_predicates(r, u->conf->no_cache)) {

case NGX_ERROR:
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;

case NGX_DECLINED:
u->cacheable = 0;
break;

default: /* NGX_OK */

if (u->cache_status == NGX_HTTP_CACHE_BYPASS) {

/* create cache if previously bypassed */

if (ngx_http_file_cache_create(r) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}

break;
}

if (u->cacheable) {
time_t now, valid;

now = ngx_time();

valid = r->cache->valid_sec;

if (valid == 0) {
valid = ngx_http_file_cache_valid(u->conf->cache_valid,
u->headers_in.status_n);
if (valid) {
r->cache->valid_sec = now + valid;
}
}
//如果没有配置proxy_cache_valid,valid的值为0
if (valid) {
r->cache->date = now;
r->cache->body_start = (u_short) (u->buffer.pos - u->buffer.start);

if (u->headers_in.status_n == NGX_HTTP_OK
|| u->headers_in.status_n == NGX_HTTP_PARTIAL_CONTENT)
{
r->cache->last_modified = u->headers_in.last_modified_time;

if (u->headers_in.etag) {
r->cache->etag = u->headers_in.etag->value;

} else {
ngx_str_null(&r->cache->etag);
}

} else {
r->cache->last_modified = -1;
ngx_str_null(&r->cache->etag);
}

if (ngx_http_file_cache_set_header(r, u->buffer.start) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

} else {
u->cacheable = 0;
}
}

ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http cacheable: %d", u->cacheable);

if (u->cacheable == 0 && r->cache) {
ngx_http_file_cache_free(r->cache, u->pipe->temp_file);
}

if (r->header_only && !u->cacheable && !u->store) {
ngx_http_upstream_finalize_request(r, u, 0);
return;
}

#endif

p = u->pipe;

p->output_filter = ngx_http_upstream_output_filter;
p->output_ctx = r;
p->tag = u->output.tag;
p->bufs = u->conf->bufs;
p->busy_size = u->conf->busy_buffers_size;
p->upstream = u->peer.connection;
p->downstream = c;
p->pool = r->pool;
p->log = c->log;
p->limit_rate = u->conf->limit_rate;
p->start_sec = ngx_time();

p->cacheable = u->cacheable || u->store;

p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
if (p->temp_file == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

p->temp_file->file.fd = NGX_INVALID_FILE;
p->temp_file->file.log = c->log;
p->temp_file->path = u->conf->temp_path; //u->conf->temp_path的值在ngx_http_proxy_handler函数中赋值,赋值流程为ngx_http_proxy_merge_loc_conf->ngx_conf_merge_path_value
p->temp_file->pool = r->pool;

if (p->cacheable) {
p->temp_file->persistent = 1;

#if (NGX_HTTP_CACHE)
if (r->cache && !r->cache->file_cache->use_temp_path) {
p->temp_file->path = r->cache->file_cache->path;
p->temp_file->file.name = r->cache->file.name;
}
#endif

} else {
p->temp_file->log_level = NGX_LOG_WARN;
p->temp_file->warn = "an upstream response is buffered "
"to a temporary file";
}

p->max_temp_file_size = u->conf->max_temp_file_size;
p->temp_file_write_size = u->conf->temp_file_write_size;

#if (NGX_THREADS)
if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) {
p->thread_handler = ngx_http_upstream_thread_handler;
p->thread_ctx = r;
}
#endif

p->preread_bufs = ngx_alloc_chain_link(r->pool);
if (p->preread_bufs == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

p->preread_bufs->buf = &u->buffer;
p->preread_bufs->next = NULL;
u->buffer.recycled = 1;

p->preread_size = u->buffer.last - u->buffer.pos;

if (u->cacheable) {

p->buf_to_file = ngx_calloc_buf(r->pool);
if (p->buf_to_file == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

p->buf_to_file->start = u->buffer.start;
p->buf_to_file->pos = u->buffer.start;
p->buf_to_file->last = u->buffer.pos;
p->buf_to_file->temporary = 1;
}

if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
/* the posted aio operation may corrupt a shadow buffer */
p->single_buf = 1;
}

/* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */
p->free_bufs = 1;

/*
* event_pipe would do u->buffer.last += p->preread_size
* as though these bytes were read
*/
u->buffer.last = u->buffer.pos;

if (u->conf->cyclic_temp_file) {

/*
* we need to disable the use of sendfile() if we use cyclic temp file
* because the writing a new data may interfere with sendfile()
* that uses the same kernel file pages (at least on FreeBSD)
*/

p->cyclic_temp_file = 1;
c->sendfile = 0;

} else {
p->cyclic_temp_file = 0;
}

p->read_timeout = u->conf->read_timeout;
p->send_timeout = clcf->send_timeout;
p->send_lowat = clcf->send_lowat;

p->length = -1;

if (u->input_filter_init
&& u->input_filter_init(p->input_ctx) != NGX_OK)
{
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}

u->read_event_handler = ngx_http_upstream_process_upstream;
r->write_event_handler = ngx_http_upstream_process_downstream;

ngx_http_upstream_process_upstream(r, u);
}

读取数据的具体操作在ngx_http_upstream_process_upstream函数中实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

static void
ngx_http_upstream_process_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u)
{
ngx_event_t *rev;
ngx_event_pipe_t *p;
ngx_connection_t *c;

c = u->peer.connection;
p = u->pipe;
rev = c->read;

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream process upstream");

c->log->action = "reading upstream";

if (rev->timedout) {

p->upstream_error = 1;
ngx_connection_error(c, NGX_ETIMEDOUT, "upstream timed out");

} else { //请求没有超时,那么对后端,处理一下读事件。ngx_event_pipe开始处理

if (rev->delayed) {

ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
"http upstream delayed");

if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
}

return;
}

if (ngx_event_pipe(p, 0) == NGX_ABORT) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
return;
}
}
//注意走到这里的时候,后端发送的头部行信息已经在前面的ngx_http_upstream_send_response->ngx_http_send_header已经把头部行部分发送给客户端了
//该函数处理的只是后端放回过来的网页包体部分
ngx_http_upstream_process_request(r, u);
}

接下来分析一下ngx_event_pipe这个函数,在有buffering的时候,使用event_pipe进行数据的转发,调用ngx_event_pipe_write_to_downstream函数读取数据,或者发送数据给客户端。
ngx_event_pipe将upstream响应发送回客户端。do_write代表是否要往客户端发送,写数据。如果设置了,那么会先发给客户端,再读upstream数据,当然,如果读取了数据,也会调用这里的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
ngx_int_t
ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
{
ngx_int_t rc;
ngx_uint_t flags;
ngx_event_t *rev, *wev;

for ( ;; ) {
if (do_write) {
p->log->action = "sending to client";

rc = ngx_event_pipe_write_to_downstream(p);

if (rc == NGX_ABORT) {
return NGX_ABORT;
}

if (rc == NGX_BUSY) {
return NGX_OK;
}
}

p->read = 0;
p->upstream_blocked = 0;

p->log->action = "reading upstream";

if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
return NGX_ABORT;
}

if (!p->read && !p->upstream_blocked) {
break;
}

do_write = 1;
}

if (p->upstream->fd != (ngx_socket_t) -1) {
rev = p->upstream->read;

flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;

if (ngx_handle_read_event(rev, flags) != NGX_OK) {
return NGX_ABORT;
}

if (!rev->delayed) {
if (rev->active && !rev->ready) {
ngx_add_timer(rev, p->read_timeout);

} else if (rev->timer_set) {
ngx_del_timer(rev);
}
}
}

if (p->downstream->fd != (ngx_socket_t) -1
&& p->downstream->data == p->output_ctx)
{
wev = p->downstream->write;
if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
return NGX_ABORT;
}

if (!wev->delayed) {
if (wev->active && !wev->ready) {
ngx_add_timer(wev, p->send_timeout);

} else if (wev->timer_set) {
ngx_del_timer(wev);
}
}
}

return NGX_OK;
}

这个函数中最重要的就是ngx_event_pipe_write_to_downstream跟ngx_event_pipe_read_upstream,这两个函数将处理来自上游的数据以及将数据转发到客户端。