摘要
最近收集了数据供个人实验使用,后期再展示实验结果。
用利用java 多线程,生产者和消费者模式一周爬取了内涵段子1300w+条数据,写了三个文件,每个文件都有1.5G左右。
程序的整体思路:
一个线程(生产者)爬取json数据并格式化成我想要的json格式(去掉一些无用信息),然后三个线程(消费者)各自写进自己的文件,每行是一条json。还有把所有的任务交给线程池ExecutorService去执行多线程,生产者和消费者的公共容器使用的是LinkedBlockingQueue。
这里解释一下,为什么是一个生产者?
原因是爬取的json数据里有请求下一个json数据的max_time变量,这导致无法使用多个生产者。
完整实现
从服务端获取json数据时中文unicode编码用fastjson解决的。我在上一篇文章中提到了解决方案。
Collect类
下面是Collect类,生产者和消费者都要操作的类,这里公用容器用LinkedBlockingQueue,每次查询都会返回20条数据,所以我把每次的20条数据写进list,容器能存储100个list。
日志里保存了获取下一条json的max_time变量,供观察运行情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
| package com.im_k.collectSet;
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern;
import org.apache.log4j.Logger;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject;
public class Collect {
static String homeUrl = "http://neihanshequ.com/joke/?is_json=1&app_name=neihanshequ_web&max_time="; static String max_time = "1508400711";
static Logger logger = Logger.getLogger(Collect.class);
private final int MAX_SIZE = 100;
private LinkedBlockingQueue<List<String>> lbq = new LinkedBlockingQueue<List<String>>(MAX_SIZE);
public Collect() {
}
private List<String> getContents() {
InputStream in = null; InputStreamReader reader = null; BufferedReader bufferedReader = null; List<String> jsonList = new ArrayList<String>();
try { URL url = new URL(homeUrl + max_time); in = url.openStream(); if (in != null) { reader = new InputStreamReader(in, "utf-8"); } else { return jsonList; } bufferedReader = new BufferedReader(reader); String temp = null; String data = ""; while ((temp = bufferedReader.readLine()) != null) { data += temp; }
Pattern pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))"); Matcher matcher = pattern.matcher(data); char ch;
while (matcher.find()) { ch = (char) Integer.parseInt(matcher.group(2), 16); data = data.replace(matcher.group(1), ch + ""); }
JSONObject jsonObject = JSON.parseObject(data);
jsonList = dataOperation(jsonObject);
} catch (MalformedURLException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { if (in != null) { in.close(); reader.close(); bufferedReader.close(); } } catch (IOException e) { e.printStackTrace(); } }
return jsonList; }
private List<String> dataOperation(JSONObject jsonObject) {
List<String> jsonList = new ArrayList<String>();
if (jsonObject == null) return jsonList;
max_time = ((JSONObject) jsonObject.get("data")).getString("max_time"); logger.info("max_time=" + max_time);
JSONArray jsonArray = (JSONArray) ((JSONObject) jsonObject.get("data")).get("data"); for (int idx = 0, len = jsonArray.size(); idx < len; idx++) {
String name = (String) ((JSONObject) ((JSONObject) ((JSONObject) jsonArray.get(idx)).get("group")) .get("user")).getString("name"); String id = (String) ((JSONObject) ((JSONObject) ((JSONObject) jsonArray.get(idx)).get("group")) .get("user")).getString("user_id"); String content = (String) ((JSONObject) ((JSONObject) jsonArray.get(idx)).get("group")).getString("text");
String jsonStr = "{\"name\":\"" + name + "\",\"id\":\"" + id + "\",\"content\":\"" + content + "\"}";
jsonList.add(jsonStr); }
return jsonList; }
public void produce() {
List<String> jsonList = getContents();
try { lbq.put(jsonList); } catch (Exception e) { e.printStackTrace(); } }
public void consume(BufferedWriter bw) {
List<String> jsonList = null; for (int i = 0; i < lbq.size(); i++) { try { jsonList = lbq.take(); writeToFile(jsonList, bw); } catch (InterruptedException e) { e.printStackTrace(); } } }
private void writeToFile(List<String> jsonList, BufferedWriter bw) { if (jsonList == null || jsonList.size() <= 0) return;
try {
for (int i = 0, len = jsonList.size(); i < len; i++) { bw.write(jsonList.get(i)); bw.newLine(); bw.flush(); } } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } }
public static void main(String[] args) throws UnsupportedEncodingException {
Collect collect = new Collect(); collect.getContents();
} }
|
多线程 生产者-消费者
每个线程都是死循环的,我写死了,直到我手动杀掉进程。
还有把所有线程提交给线程池去运行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| package com.im_k.collectSet;
import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
class Producer implements Runnable {
private Collect collect;
public Producer(Collect collect) { this.collect = collect; }
public void run() { while (true) { collect.produce(); } } }
class Consumer implements Runnable {
private Collect collect;
public Consumer(Collect collect) { this.collect = collect; }
public void run() {
BufferedWriter bw = null;
String fileName = Thread.currentThread().getName(); fileName = "./samples." + fileName + ".txt"; System.out.println(fileName); try { bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(fileName), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); }
while (true) { collect.consume(bw); } } }
public class MultiThread {
public static void main(String[] args) {
Collect collect = new Collect();
Producer p1 = new Producer(collect);
Consumer c1 = new Consumer(collect); Consumer c2 = new Consumer(collect); Consumer c3 = new Consumer(collect);
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.execute(p1); executorService.execute(c1); executorService.execute(c2); executorService.execute(c3);
executorService.shutdown(); } }
|
完结。