当前位置: 首页 > news >正文

AIGC: 关于ChatGPT中基于API实现一个StreamClient流式客户端

Java版GPT的StreamClient

  • 可作为其他编程语言的参考
  • 注意: 下面包名中的 xxx 可以换成自己的
  • 代码基于java,来源于网络,可修改成其他编程语言实现
  • 参考前文: https://blog.csdn.net/Tyro_java/article/details/134748994

1 )核心代码结构设计

  • src
    • main
      • java
        • com.xxx.gpt.client
          • listener
            • AbstractStreamListener.java
            • ConsoleStreamListener.java
          • ChatGPTStreamClient.java
    • test
      • java
        • com.xxx.gpt.client.test
          • StreamClientTest.java

2 )相关程序如下

  • 前文,通过我们开发的Client能够正常的和 Open AI 进行交互,能够去调用GPT的API
  • 通过API将我们的 message 请求发送给GPT并且正常的接收到了GPT对我们的返回
  • 在前面我们去浏览 GPT 它的API的时候,我们发现它是支持流式访问的
  • 我们可以开发一个Stream的Client,能够支持流式的接收GPT的响应
  • 流式的Client在很多场景下也是非常有必要的
  • 首先需要去先创建一个listener,去流式的接收GPT的返回, 我们实现一个 AbstractStreamListener 类 和 ConsoleStreamListener 类
  • 需要继承于 EventSourceListener, 内部添加几个方法
    • onOpen
    • onEvent, 可以获取到流失的输入,这个是重点
    • onClosed
    • onFailure

AbstractStreamListener.java

package com.xxx.gpt.client.listener;import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.xxx.gpt.client.entity.ChatChoice;
import com.xxx.gpt.client.entity.ChatCompletionResponse;
import com.xxx.gpt.client.entity.Message;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;@Slf4j
public abstract class AbstractStreamListener extends EventSourceListener {protected String lastMessage = "";/*** Called when all new message are received.** @param message the new message*/@Setter@Getterprotected Consumer<String> onComplate = s -> {};/*** Called when a new message is received.* 收到消息 单个字** @param message the new message*/public abstract void onMsg(String message);/*** Called when an error occurs.* 出错时调用** @param throwable the throwable that caused the error* @param response  the response associated with the error, if any*/public abstract void onError(Throwable throwable, String response);@Overridepublic void onOpen(EventSource eventSource, Response response) {// do nothing}@Overridepublic void onClosed(EventSource eventSource) {// do nothing}@Overridepublic void onEvent(EventSource eventSource, String id, String type, String data) {if (data.equals("[DONE]")) {onComplate.accept(lastMessage);return;}// 将数据反序列化为 GPT的 responseChatCompletionResponse response = JSON.parseObject(data, ChatCompletionResponse.class);// 获取GPT的返回,读取JsonList<ChatChoice> choices = response.getChoices();// 为空则 returnif (choices == null || choices.isEmpty()) {return;}// 获取流式信息Message delta = choices.get(0).getDelta();String text = delta.getContent();if (text != null) {lastMessage += text;onMsg(text);}}@SneakyThrows@Overridepublic void onFailure(EventSource eventSource, Throwable throwable, Response response) {try {log.error("Stream connection error: {}", throwable);String responseText = "";if (Objects.nonNull(response)) {responseText = response.body().string();}log.error("response:{}", responseText);String forbiddenText = "Your access was terminated due to violation of our policies";if (StrUtil.contains(responseText, forbiddenText)) {log.error("Chat session has been terminated due to policy violation");log.error("检测到号被封了");}String overloadedText = "That model is currently overloaded with other requests.";if (StrUtil.contains(responseText, overloadedText)) {log.error("检测到官方超载了,赶紧优化你的代码,做重试吧");}this.onError(throwable, responseText);} catch (Exception e) {log.warn("onFailure error:{}", e);// do nothing} finally {eventSource.cancel();}}
}

ConsoleStreamListener.java

package com.xxx.gpt.client.listener;import lombok.extern.slf4j.Slf4j;@Slf4j
public class ConsoleStreamListener extends AbstractStreamListener {@Overridepublic void onMsg(String message) {System.out.print(message);}@Overridepublic void onError(Throwable throwable, String response) {}
}
  • 再创建一个 ChatGPTStreamClient 类
    • 添加如下相关属性
    • 添加 init 方法
    • 完成 streamChatCompletion 方法

ChatGPTStreamClient.java

package com.xxx.gpt.client;import cn.hutool.core.util.RandomUtil;
import cn.hutool.http.ContentType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xxx.gpt.client.entity.ChatCompletion;
import com.xxx.gpt.client.entity.Message;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;import java.net.Proxy;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;@Slf4j
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChatGPTStreamClient {private String apiKey;private List<String> apiKeyList;private OkHttpClient okHttpClient;/*** 连接超时*/@Builder.Defaultprivate long timeout = 90;/*** 网络代理*/@Builder.Defaultprivate Proxy proxy = Proxy.NO_PROXY;/*** 反向代理*/@Builder.Defaultprivate String apiHost = ChatApi.CHAT_GPT_API_HOST;/*** 初始化*/public ChatGPTStreamClient init() {OkHttpClient.Builder client = new OkHttpClient.Builder();client.connectTimeout(timeout, TimeUnit.SECONDS);client.writeTimeout(timeout, TimeUnit.SECONDS);client.readTimeout(timeout, TimeUnit.SECONDS);if (Objects.nonNull(proxy)) {client.proxy(proxy);}okHttpClient = client.build();return this;}/*** 流式输出*/public void streamChatCompletion(ChatCompletion chatCompletion,EventSourceListener eventSourceListener) {chatCompletion.setStream(true);try {EventSource.Factory factory = EventSources.createFactory(okHttpClient);ObjectMapper mapper = new ObjectMapper();String requestBody = mapper.writeValueAsString(chatCompletion);String key = apiKey;if (apiKeyList != null && !apiKeyList.isEmpty()) {key = RandomUtil.randomEle(apiKeyList);}Request request = new Request.Builder().url(apiHost + "v1/chat/completions").post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()),requestBody)).header("Authorization", "Bearer " + key).build();factory.newEventSource(request, eventSourceListener);} catch (Exception e) {log.error("请求出错:{}", e);}}/*** 流式输出*/public void streamChatCompletion(List<Message> messages,EventSourceListener eventSourceListener) {ChatCompletion chatCompletion = ChatCompletion.builder().messages(messages).stream(true).build();streamChatCompletion(chatCompletion, eventSourceListener);}
}

再添加一个测试方法 StreamClientTest.java

package com.xxx.gpt.client.test;import com.xxx.gpt.client.ChatGPTStreamClient;
import com.xxx.gpt.client.entity.ChatCompletion;
import com.xxx.gpt.client.entity.Message;
import com.xxx.gpt.client.listener.ConsoleStreamListener;
import com.xxx.gpt.client.util.Proxys;
import org.junit.Before;
import org.junit.Test;import java.net.Proxy;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;public class StreamClientTest {private ChatGPTStreamClient chatGPTStreamClient;@Beforepublic void before() {Proxy proxy = Proxys.http("127.0.0.1", 7890);chatGPTStreamClient = ChatGPTStreamClient.builder().apiKey("sk-6kchadsfsfkc3aIs66ct") // 填入自己的 key.proxy(proxy).timeout(600).apiHost("https://api.openai.com/").build().init();}@Testpublic void chatCompletions() {ConsoleStreamListener listener = new ConsoleStreamListener();Message message = Message.of("写一段七言绝句诗");ChatCompletion chatCompletion = ChatCompletion.builder().messages(Arrays.asList(message)).build();chatGPTStreamClient.streamChatCompletion(chatCompletion, listener);try {Thread.sleep(10000);} catch (InterruptedException e) {throw new RuntimeException(e);}}
}
  • 这样,程序基本已经完成了
  • 这里构建了一个流式访问的参数,然后去调用GPT的API实现了流式的输出
  • 可参考以上 Java 版实现, 去实现其他语言版本的 StreamClient

相关文章:

AIGC: 关于ChatGPT中基于API实现一个StreamClient流式客户端

Java版GPT的StreamClient 可作为其他编程语言的参考注意: 下面包名中的 xxx 可以换成自己的代码基于java&#xff0c;来源于网络&#xff0c;可修改成其他编程语言实现参考前文: https://blog.csdn.net/Tyro_java/article/details/134748994 1 &#xff09;核心代码结构设计 …...

FutureTask

1. 作用 异步操作获取执行结果取消任务执行&#xff0c;判断是否取消执行判断任务执行是否完毕 2. demo public static void main(String[] args) throws Exception {Callable<String> callable () -> search();FutureTask<String> futureTasknew FutureTask&…...

【力扣热题100】207. 课程表 python 拓扑排序

【力扣热题100】207. 课程表 python 拓扑排序 写在最前面207. 课程表解决方案&#xff1a;判断是否可以完成所有课程的学习方法&#xff1a;拓扑排序实现步骤Python 实现性能分析结论 写在最前面 刷一道力扣热题100吧 难度中等 https://leetcode.cn/problems/course-schedule…...

【滑动窗口】LeetCode2953:统计完全子字符串

作者推荐 [二分查找]LeetCode2040:两个有序数组的第 K 小乘积 本题其它解法 【离散差分】LeetCode2953:统计完全子字符串 题目 给你一个字符串 word 和一个整数 k 。 如果 word 的一个子字符串 s 满足以下条件&#xff0c;我们称它是 完全字符串&#xff1a; s 中每个字符…...

base64转PDF

今天做皖事通的对接&#xff0c;下载电子证照后发现回传的是base64&#xff0c;调试确认是个麻烦事&#xff0c;网上搜了一下没有base64转PDF的在线预览功能&#xff0c;只能自己写个调试工具了&#xff0c;以下是通过纯JS方式写的代码&#xff0c;可直接拿去使用&#xff1a; …...

clip-path,css裁剪函数

https://www.cnblogs.com/dzyany/p/13985939.html clip-path - CSS&#xff1a;层叠样式表 | MDN 我们看下这个例子 polygon里有四个值分别代表这四个点相对于原图左上方的偏移量。 裁剪个五角星...

第二证券:食品饮料板块拉升,乳业股亮眼,西部牧业“20cm”涨停

证券时报网讯&#xff0c;食物饮料板块5日盘中拉升走高&#xff0c;乳业股体现活跃&#xff0c;到发稿&#xff0c;骑士乳业涨超27%&#xff0c;西部牧业“20cm”涨停&#xff0c;阳光乳业亦涨停。 其它个股方面&#xff0c;盖世食物涨超20%&#xff0c;润普食物涨超18%&#…...

React 好用的工具库

1、html-react-parser HTML 到 React 解析器&#xff0c;适用于服务器 &#xff08;Node.js&#xff09; 和客户端&#xff08;浏览器&#xff09;&#xff0c;适用于React节点修改过滤等需求 解析器将 HTML 字符串转换为一个或多个 React 元素。可以将一个元素替换为另一个元素…...

C++面试宝典第2题:逆序输出整数

题目 写一个方法&#xff0c;将一个整数逆序打印输出到控制台。注意&#xff1a;当输入的数字含有结尾的0时&#xff0c;输出不应带有前导的0。比如&#xff1a;123的逆序输出为321&#xff0c;8600的逆序输出为68&#xff0c;-609的逆序输出为-906。 解析 这道题本身并没有什么…...

Twincat功能块使用经验总结

控制全局变量&#xff1a; //轴控制指令 bi_Power: BOOL; //使能 bi_Reset: BOOL; //复位 bi_Stop: BOOL; //停止 bi_JogForward: BOOL; //正向点动 bi_JogBackwards: BOOL; //反向点动 bi_MoveAdditive: BOOL; //增量位…...

香港服务器时间不准,差8小时

解决方案1 1、timedatectl查看系统时间 2、查看系统时区 ls /usr/share/zoneinfo 3、删除当前系统所处时区 rm /etc/localtime 4、创建软链接&#xff0c;以替换当前的时区信息 ln -s /usr/share/zoneinfo/Universal /etc/localtime 解决方案2 手动设置硬件时钟 1、设置系…...

C++ 抽象类和接口 详解

目录 0 引言1 抽象类2 接口2.1 Java与C接口的区别 &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&#xff1a;C专栏&#x1f4a5; 标题&#xff1a;C 抽象类和接口 详解❣️ 寄语&#xff1a;书到用时方恨少&#xff0c;事非经过不知难&#xff01;&#x1f…...

【Linux】awk 使用

awk 输出 // 打印所有列 $ awk {print $0} file // 打印第一列 $ awk {print $1} file // 打印第一和第三列 $ awk {print $1, $3} file // 打印第三列和第一列&#xff0c;注意先后顺序 $ cat file | awk {print $3, $1} …...

LeetCode力扣每日一题(Java):9、回文数

一、题目 二、解题思路 1、我的思路 当x<0时&#xff0c;x一定不是回文数&#xff0c;直接返回false 当x>0且x<10时&#xff0c;x一定是回文数&#xff0c;直接返回true x>10时&#xff0c;先将x转为字符串。将数字转成字符串方法挺多的&#xff0c;以下是&…...

WPF前端实现人脸扫描动画效果

前言 本章实现的效果主要通过OpacityMask与LinearGradientBrush(径向渐变) 的组合应用来实现。最终实现效果如下: LinearGradientBrush线性渐变画刷 LinearGradientBrush其实很简单,我们只需要关注5个属性,使用这5个属性你就可以完成这个画刷几乎所有的变化。 属性介…...

更改AndroidStudio模拟器位置

C盘何等的珍贵&#xff0c;可是好多工具&#xff0c;软件非得默认安装在C盘。。导致C盘越来越紧张。。 在日常使用过程中&#xff0c;安装任何软件都会将其安装到非系统盘下&#xff0c;Android模拟器也不能例外。保护好C盘也是日常一个良好的习惯。 Android AVD默认路径&…...

Dash 协议介绍

<?xml version"1.0" encoding"utf-8"?> <MPD xmlns"urn:mpeg:dash:schema:mpd:2011" minBufferTime"PT1.5S" type"static" mediaPresentationDuration"PT0H1M0.3S" maxSegmentDuration"PT0H0M2.0…...

RabbitMQ的消息发送和接收机制

所有 MQ 产品从模型抽象上来说都是一样的过程&#xff1a; 消费者&#xff08;consumer&#xff09;订阅某个队列。生产者&#xff08;producer&#xff09;创建消息&#xff0c;然后发布到队列&#xff08;queue&#xff09;中&#xff0c;最后将消息发送到监听的消费者。 上…...

记录111

在两台 RHEL 8 服务器上搭建 PostgreSQL 和 pgpool-II 环境涉及到安装 PostgreSQL、配置流复制&#xff08;Streaming Replication&#xff09;以及安装和配置 pgpool-II。以下是详细的步骤&#xff1a; ### 准备工作 1. **获取服务器**&#xff1a;确保你有两台运行 RHEL 8 的…...

振动和震动的区别?

问题描述&#xff1a;振动和震动的区别&#xff1f; 问题解决&#xff1a; 震动&#xff08;Oscillation&#xff09;&#xff1a; 特点&#xff1a; 随机的、突发的、不经常的、无规律的运动。例子&#xff1a; 地壳震动、消息震动全国&#xff0c;强调的是运动的力度或幅度&…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

Android Wi-Fi 连接失败日志分析

1. Android wifi 关键日志总结 (1) Wi-Fi 断开 (CTRL-EVENT-DISCONNECTED reason3) 日志相关部分&#xff1a; 06-05 10:48:40.987 943 943 I wpa_supplicant: wlan0: CTRL-EVENT-DISCONNECTED bssid44:9b:c1:57:a8:90 reason3 locally_generated1解析&#xff1a; CTR…...

ESP32 I2S音频总线学习笔记(四): INMP441采集音频并实时播放

简介 前面两期文章我们介绍了I2S的读取和写入&#xff0c;一个是通过INMP441麦克风模块采集音频&#xff0c;一个是通过PCM5102A模块播放音频&#xff0c;那如果我们将两者结合起来&#xff0c;将麦克风采集到的音频通过PCM5102A播放&#xff0c;是不是就可以做一个扩音器了呢…...

uniapp微信小程序视频实时流+pc端预览方案

方案类型技术实现是否免费优点缺点适用场景延迟范围开发复杂度​WebSocket图片帧​定时拍照Base64传输✅ 完全免费无需服务器 纯前端实现高延迟高流量 帧率极低个人demo测试 超低频监控500ms-2s⭐⭐​RTMP推流​TRTC/即构SDK推流❌ 付费方案 &#xff08;部分有免费额度&#x…...

自然语言处理——Transformer

自然语言处理——Transformer 自注意力机制多头注意力机制Transformer 虽然循环神经网络可以对具有序列特性的数据非常有效&#xff0c;它能挖掘数据中的时序信息以及语义信息&#xff0c;但是它有一个很大的缺陷——很难并行化。 我们可以考虑用CNN来替代RNN&#xff0c;但是…...

成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战

在现代战争中&#xff0c;电磁频谱已成为继陆、海、空、天之后的 “第五维战场”&#xff0c;雷达作为电磁频谱领域的关键装备&#xff0c;其干扰与抗干扰能力的较量&#xff0c;直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器&#xff0c;凭借数字射…...

算法笔记2

1.字符串拼接最好用StringBuilder&#xff0c;不用String 2.创建List<>类型的数组并创建内存 List arr[] new ArrayList[26]; Arrays.setAll(arr, i -> new ArrayList<>()); 3.去掉首尾空格...

基于 TAPD 进行项目管理

起因 自己写了个小工具&#xff0c;仓库用的Github。之前在用markdown进行需求管理&#xff0c;现在随着功能的增加&#xff0c;感觉有点难以管理了&#xff0c;所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD&#xff0c;需要提供一个企业名新建一个项目&#…...

安卓基础(Java 和 Gradle 版本)

1. 设置项目的 JDK 版本 方法1&#xff1a;通过 Project Structure File → Project Structure... (或按 CtrlAltShiftS) 左侧选择 SDK Location 在 Gradle Settings 部分&#xff0c;设置 Gradle JDK 方法2&#xff1a;通过 Settings File → Settings... (或 CtrlAltS)…...

【Elasticsearch】Elasticsearch 在大数据生态圈的地位 实践经验

Elasticsearch 在大数据生态圈的地位 & 实践经验 1.Elasticsearch 的优势1.1 Elasticsearch 解决的核心问题1.1.1 传统方案的短板1.1.2 Elasticsearch 的解决方案 1.2 与大数据组件的对比优势1.3 关键优势技术支撑1.4 Elasticsearch 的竞品1.4.1 全文搜索领域1.4.2 日志分析…...