摘要

最近收集了数据供个人实验使用,后期再展示实验结果。

用利用java 多线程,生产者和消费者模式一周爬取了内涵段子1300w+条数据,写了三个文件,每个文件都有1.5G左右。

程序的整体思路:

一个线程(生产者)爬取json数据并格式化成我想要的json格式(去掉一些无用信息),然后三个线程(消费者)各自写进自己的文件,每行是一条json。还有把所有的任务交给线程池ExecutorService去执行多线程,生产者和消费者的公共容器使用的是LinkedBlockingQueue。

这里解释一下,为什么是一个生产者?

原因是爬取的json数据里有请求下一个json数据的max_time变量,这导致无法使用多个生产者。

完整实现

从服务端获取json数据时中文unicode编码用fastjson解决的。我在上一篇文章中提到了解决方案。

Collect类

下面是Collect类,生产者和消费者都要操作的类,这里公用容器用LinkedBlockingQueue,每次查询都会返回20条数据,所以我把每次的20条数据写进list,容器能存储100个list。

日志里保存了获取下一条json的max_time变量,供观察运行情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package com.im_k.collectSet;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.log4j.Logger;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

/**
* @author dalai
* @date 2017-10-20
*
*/

public class Collect {
/*
* 本程序内容:
* 从http://neihanshequ.com/joke/?is_json=1&app_name=neihanshequ_web&max_time=
* 1508400711 获取json,解析出用户id、用户名、和内容,并且构造成json写到文件,每个段子写一行。
*/

static String homeUrl = "http://neihanshequ.com/joke/?is_json=1&app_name=neihanshequ_web&max_time=";
static String max_time = "1508400711";

static Logger logger = Logger.getLogger(Collect.class);

private final int MAX_SIZE = 100;

private LinkedBlockingQueue<List<String>> lbq = new LinkedBlockingQueue<List<String>>(MAX_SIZE);

public Collect() {

}

// 从服务端获取json
private List<String> getContents() {

InputStream in = null;
InputStreamReader reader = null;
BufferedReader bufferedReader = null;
List<String> jsonList = new ArrayList<String>();

try {
URL url = new URL(homeUrl + max_time);
in = url.openStream();
if (in != null) {
reader = new InputStreamReader(in, "utf-8");
} else {
return jsonList;
}
bufferedReader = new BufferedReader(reader);
String temp = null;
String data = "";
while ((temp = bufferedReader.readLine()) != null) {
data += temp;
}

Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))");
Matcher matcher = pattern.matcher(data);
char ch;

while (matcher.find()) {
ch = (char) Integer.parseInt(matcher.group(2), 16);
data = data.replace(matcher.group(1), ch + "");
}

JSONObject jsonObject = JSON.parseObject(data);

jsonList = dataOperation(jsonObject);

} catch (MalformedURLException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
reader.close();
bufferedReader.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}

return jsonList;
}

// 解析,构造json格式
private List<String> dataOperation(JSONObject jsonObject) {

List<String> jsonList = new ArrayList<String>();

if (jsonObject == null)
return jsonList;

max_time = ((JSONObject) jsonObject.get("data")).getString("max_time");
logger.info("max_time=" + max_time);
// System.out.println("max_time=" + max_time);

JSONArray jsonArray = (JSONArray) ((JSONObject) jsonObject.get("data")).get("data");
for (int idx = 0, len = jsonArray.size(); idx < len; idx++) {

String name = (String) ((JSONObject) ((JSONObject) ((JSONObject) jsonArray.get(idx)).get("group"))
.get("user")).getString("name");
String id = (String) ((JSONObject) ((JSONObject) ((JSONObject) jsonArray.get(idx)).get("group"))
.get("user")).getString("user_id");
String content = (String) ((JSONObject) ((JSONObject) jsonArray.get(idx)).get("group")).getString("text");

String jsonStr = "{\"name\":\"" + name + "\",\"id\":\"" + id + "\",\"content\":\"" + content + "\"}";

jsonList.add(jsonStr);
}

return jsonList;
}

// 生产者
public void produce() {

List<String> jsonList = getContents();

// if (lbq.size() == MAX_SIZE) {
// System.out.println("缓冲区容量:" + MAX_SIZE + "\t 暂时不能执行任务!");
// }

try {
lbq.put(jsonList);
} catch (Exception e) {
e.printStackTrace();
}
}

// 消费者
public void consume(BufferedWriter bw) {

// if (lbq.size() == 0) {
// System.out.println("缓冲区容量:0 \t 暂时不能执行任务!");
// }

List<String> jsonList = null;
for (int i = 0; i < lbq.size(); i++) {
try {
jsonList = lbq.take();
writeToFile(jsonList, bw);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

// 写进文件中
private void writeToFile(List<String> jsonList, BufferedWriter bw) {
if (jsonList == null || jsonList.size() <= 0)
return;

try {

for (int i = 0, len = jsonList.size(); i < len; i++) {
bw.write(jsonList.get(i));
bw.newLine();
bw.flush();
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

// test
public static void main(String[] args) throws UnsupportedEncodingException {

Collect collect = new Collect();
collect.getContents();

}
}

多线程 生产者-消费者

每个线程都是死循环的,我写死了,直到我手动杀掉进程。

还有把所有线程提交给线程池去运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.im_k.collectSet;

import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @auther dalai
* @date 2017-10-20
*
*/

//生产者
class Producer implements Runnable {

private Collect collect;

public Producer(Collect collect) {
this.collect = collect;
}

public void run() {
while (true) {
collect.produce();
}
}
}

//消费者
class Consumer implements Runnable {

private Collect collect;

public Consumer(Collect collect) {
this.collect = collect;
}

public void run() {

BufferedWriter bw = null;

String fileName = Thread.currentThread().getName();
fileName = "./samples." + fileName + ".txt";
System.out.println(fileName);
try {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileName), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
}

while (true) {
collect.consume(bw);
}
}
}

//多线程类
public class MultiThread {

public static void main(String[] args) {

Collect collect = new Collect();

// 一个生产者
Producer p1 = new Producer(collect);

// 三个消费者
Consumer c1 = new Consumer(collect);
Consumer c2 = new Consumer(collect);
Consumer c3 = new Consumer(collect);

ExecutorService executorService = Executors.newFixedThreadPool(4);

executorService.execute(p1);
executorService.execute(c1);
executorService.execute(c2);
executorService.execute(c3);

executorService.shutdown();
}
}

完结。