My Little World

一个实现流通信的案例

背景

前端包括node层和纯前端层,需要请求第三方http接口在页面实现chatGTP的打字机效果

方案

  1. 在node层调用第三方http接口,避免跨域问题
  2. 由于第三方接口为流式接口,从node层发出请求再转发到前端也需要进行流式通信
  3. 前端层对返回的流式数据进行处理后更新数据呈现在页面上

实现

  1. 前端层使用fetch进行请求,使用ReadableStream进行流式处理
  2. node层使用axios进行请求,使用stream进行流式处理

node层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import axios from 'axios';
import { PassThrough } from 'stream';

export function faqStream(body?: any): Promise<any> {
const ctx = useContext<HttpContext>();
ctx.set({
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
'Content-Type': 'application/octet-stream' // 表示返回数据是个 stream
});
const stream = new PassThrough();
ctx.body = stream;
// 发起第三方请求
const headers = {
'Content-Type': 'application/json'
};
const url = 'http://vvvv.xxxx.net/aiBot/oncall_response_stream';
axios
.post(url, ctx.request.body, { headers: headers, responseType: 'stream' })
.then((response) => {
if (response.status !== 200) {
console.error('Error status:', response.status);
return;
}
response.data.on('data', (chunk) => {
chunk
.toString()
.split('\n\n')
.filter((item) => item)
.forEach((chunkStr) => {
let chunkJson = {};
try {
chunkJson = JSON.parse(chunkStr);
} catch (error) {
console.error('Error parse:', error);
console.error('Error chunkStr:', chunkStr);
console.error('Error origin chunk:', chunk.toString());
}
if (chunkJson?.data?.chunk) {
// 拿到有效数据后,传给前端
stream.write(chunkJson.data.chunk);
}
});
});
response.data.on('end', () => {
// 第三方请求流结束后,关闭向前端写的流
stream.end();
});
})
.catch((error) => {
console.error('Error all:', error);
});
}

前端层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
  import { useState, useCallback, useRef } from 'react';

const [listLoading, setListLoading] = useState(0);
const [hasReport, setHasReport] = useState(-1);
const [AiRemoteData, setAIRemoteData] = useState('');
const AiRequestController = useRef();
const { addThrottle } = useThrottleFunc();

// 拉取Ai 回答
const getAIRemoteData = useCallback(
addThrottle(
async () => {
const { keyWord } = query;
if (!keyWord) {
return;
}
try {
setListLoading((val) => val + 1);
if (AiRequestController.current) {
// 如果当前请求存在,则取消当前请求
AiRequestController.current.abort();
}
AiRequestController.current = new AbortController();
const jwtToken = await getJwt();
// 1. 创建一个新的请求
const response = await fetch(
`https://hahahaha.net/api/diagnosisBasic/faqStream`,
{
method: 'POST',
body: JSON.stringify({
user_id: userInfo.id,
query: keyWord,
class: ''
}),
headers: {
'x-jwt-token': jwtToken
},
signal: AiRequestController.current.signal
}
);
const reader = response.body.getReader(); // 获取reader
const decoder = new TextDecoder(); // 文本解码器
let answer = ''; // 存储答案
// 2. 循环取值
while (true) {
// 取值, value 是后端返回流信息, done 表示后端结束流的输出
const { value, done } = await reader.read();
if (done) {
break;
}
// 对 value 进行解码
const val = decoder.decode(value);
if (!answer) {
setListLoading((count) => count - 1);
setHasReport(-1);
}
answer += val;
setAIRemoteData(answer);
}
} catch {
setAIRemoteData('');
console.error('数据解析出错');
} finally {
setListLoading((val) => val - 1);
}
},
500,
'getAIRemoteData'
),
[query.keyWord]
);

番外: 全局节流函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
const throttleTimerList: { [key: string]: Timeout | null } = {};
export const useThrottleFunc = () => {
const timer = useRef();
const addThrottle = (
fn: (params?: LooseObject | undefined) => void,
waitTime?: number,
timerKey?: string
) => {
const timerFlag = timerKey || 'getRemoteData';
throttleTimerList[timerFlag] = null;
return (params?: LooseObject | undefined) => {
if (throttleTimerList[timerFlag]) {
clearTimeout(throttleTimerList[timerFlag]);
}
throttleTimerList[timerFlag] = setTimeout(() => {
fn(params);
clearTimeout(throttleTimerList[timerFlag]);
throttleTimerList[timerFlag] = null;
}, waitTime || 500);
};
};

useEffect(
() => () => {
Object.keys(throttleTimerList).forEach((key) => {
if (throttleTimerList[key]) {
clearTimeout(throttleTimerList[key]);
}
delete throttleTimerList[key];
});
},
[]
);
return {
addThrottle,
throttleTimer: timer
};
};

相关知识链接

NODE-Stream
NODE-ReadableStream
ReadableStream
application/octet-stream vs text/event-stream
AbortController
fetch获取流式数据相关问题
在 Koa 中基于 gpt-3.5 模型实现一个最基本的流式问答 DEMO