封面:PIXIV 82860854

@やたぬき圭

我们在上一篇已经了解了如何使用 MongoDB 存入每一次的对话记录。接下来我们将尝试使用这些聊天记录为我们的 AI 提供更加久远的记忆,我们将要自动化这一过程。

前言

在与 AI 的交互过程中,我们不可能每一分每一秒都在交互,要实现更加合理地管理聊天记录和记忆簇,我们将会使用 “会话” 机制管理每一次交互。注意,这与 ChatGPT 网页版交互理念是不一样的。区别在于,ChatGPT 提供了用户可选的 会话回溯 机制,我们可以找回之前的对话并继续之前的内容。但为什么在这里我没有采用这个方式呢?因为个人助理 有别于 单个 LLM 工具,她是具有 时空属性 的。你既可以理解为,我们的 AI 助理可以自动化回溯会话这一过程,也可以理解为这是一种“更拟人”的体现。在这种情况下,对话相对会更加自然而流畅,同时我认为 ChatGPT 网页版也是权衡成本,毕竟这么多用户全都自动化会话数据库那开销就要上天了。

你会发现,上一段话多次提到了会话这一概念,那我们如何理解呢?

会话

会话指的是两个或多个参与者之间的交谈或交流。在计算机和网络领域,会话通常指的是用户与系统之间的一段互动时间。例如,当你登录一个网站并与其进行交互(如浏览页面、发送消息等)时,这段时间就被称为一个会话。

会话超时

会话超时则是指在一段时间内没有任何活动后,系统自动结束该会话的状态。这通常是为了安全考虑,比如防止他人在你离开电脑时访问你的账户。比如,如果你在某个网站上停留了太久没有点击任何东西,系统可能会自动注销你,这个过程就叫做会话超时。

简而言之:

  • 会话是你和系统互动的时间段。
  • 会话超时是因为长时间没有互动而自动结束这个时间段的过程。

我们今天要实现的功能就是,设置一个会话系统,当会话超时了,系统会自动把刚刚的会话的所有内容进行总结,并存到 MongoDB 数据库里,便于后续我们回溯。

计时器模块

计时器模块我们有两个选择:线程计时器异步计时器 。这两种计时器各有优劣,但是考虑到本教程是从零教程,同时由于使用异步计时器需要所有的异步操作都需要在事件循环中运行,这可能不利于读者理解。再加之不是所有的函数都是异步函数,故本篇文章暂时采用线程计时器作为会话超时的计时器使用。这里避免误导提前说明,没有对错,只有对于不同场景下的权衡利弊的选择。

我们上一篇文章为大家介绍了自定义模块,今天我们仍然会创建自定义模块。

我们创建一个 cyberaitimer.py 文件,它在文件树里面看起来应该是这样的:

your_project/
│
├── main.py
└── cyberaimodules/
    ├── __init__.py
    ├── cyberaimongo.py 
    └── cyberaitimer.py # 计时器模块

下面是我们的模块代码:

# 计时器模块
import threading

class CyberaiTimer:
    """
    CyberaiTimer 类用于创建一个定时器,在指定的超时时间后执行回调函数。

    Attributes:
        timeout (float): 定时器的超时时间(以秒为单位)。
        callback (function): 定时器超时后要执行的回调函数。
        timer (threading.Timer): threading.Timer 对象,用于实现定时器功能。
    """

    def __init__(self, timeout, callback):
        """
        初始化 CyberaiTimer 实例。

        Args:
            timeout (float): 定时器的超时时间(以秒为单位)。
            callback (function): 定时器超时后要执行的回调函数。
        """
        self.timeout = timeout
        self.callback = callback
        self.timer = None

    def start_timer(self):
        """
        启动定时器。如果定时器已经在运行,则先取消当前定时器,然后重新启动。
        """
        if self.timer:
            self.timer.cancel()
        self.timer = threading.Timer(self.timeout, self.callback)
        self.timer.start()

    def stop_timer(self):
        """
        停止定时器。如果定时器正在运行,则取消定时器并将其设为 None。
        """
        if self.timer:
            self.timer.cancel()
            self.timer = None

这个模块的设计思路是这样的:

  1. 初始化函数 (init):
    • 当我们创建一个新的计时器时,我们需要告诉它两件事: a) 要等多长时间 (timeout) b) 时间到了要做什么 (callback)
    • 这就像设置一个闹钟,你要设定响铃时间,并决定闹铃响时要做什么。
  2. 启动计时器 (start_timer):
    • 这个函数就像按下闹钟的启动按钮。
    • 如果闹钟已经在运行,我们先关掉它,然后重新设置。这确保我们不会有两个闹钟同时运行。
  3. 停止计时器 (stop_timer):
    • 这就像在闹铃响之前把闹钟关掉。
    • 如果计时器正在运行,我们取消它并将其设置为 None (表示没有正在运行的计时器)。
  4. 使用 threading.Timer:
    • Python 提供了一个内置的计时器工具叫 threading.Timer
    • 这就像闹钟的内部机制。我们不需要知道它具体如何工作,只需要知道如何使用它。
  5. 为什么使用类?
    • 使用类可以把所有与计时器相关的东西(超时时间、回调函数、开始、停止)组织在一起。
    • 这样,每次需要一个新的计时器时,我们只需创建这个类的一个新实例,而不用到处复制粘贴代码。
  6. 灵活性:
    • 这个设计允许我们创建多个不同的计时器,每个都有自己的超时时间和要执行的任务。

会话 (session_id) 系统

对于当前用户,有且只有一个会话 ID 是当前有效的,同时还要能够在计时器到时间的时候回调 (call_back) 给相对应的函数,这种情况下用一个全局变量可能是一个比较好的选择……

(是的,你可能会觉得全局变量不合适,如果你这么觉得,你肯定不是小白,所以你想用什么就用什么,上下文管理器 contextmanagerthreading.local() 或者是 contextvars.ContextVar)等等等等你想用什么就用什么)

会话超时后,全局变量擦除,生成新的会话 ID,之前的会话 ID 存入数据库便于以后通过 ID 来回溯之前的会话。

我们要在之前的 main.py 里面加上一些内容:

session_id = None
chat_id_pool = []

然后在 get_response_from_llm 函数里加上用于更新 session_idchat_id_pool 的内容:

global session_id
if session_id is None:
  session_id = str(uuid.uuid4()).replace('-', '')
  print(f"已生成新的会话ID:{session_id}")

# 把本次对话的记录的绝对ID插入到对话ID列表
chat_id_pool.append(chat_id)

这里的逻辑大概是这样的:当我们开始和AI聊天,判断有无定时器,没有就新建一个定时器开始计时。然后当 get_response_from_llm 监测到 session_id 是空的时候,这表明当前没有会话存在,则新建一个 session_id 。每一次AI回复完成后,都会将本次对话(注意,本次对话指的是你问我答,一来一回两条消息)的 chat_id 插入到 chat_id_pool 列表里以供会话总结的时候用,同时刷新计时器,重新开始会话计时。

当我们很久没有和AI对话并达到了超时时间时,计时器会触发关闭会话函数 end_session_and_start_new 这个函数会负责以下内容:

  1. 读取 chat_id_pool 里的所有 chat_id 并使用 for 循环在数据库里找到所有对应的聊天记录。这些聊天记录代表本次的聊天内容, end_session_and_start_new 函数会对这些内容总结。
  2. chat_id_pool 里的所有 chat_id 以及总结的内容写入 MongoDB 数据库可以参考这个数据结构:
'timestamp': timestamp_str, # 时间戳
'session_id': session_id_json_data, # 会话ID
'session_summary': session_summary, # 会话总结
'session_id_pool': session_id_pool_json_data # 对应的聊天记录列表

对于这些功能,的实现,我们需要修改很多内容……这里有一个列表便于大家理解:

  1. 修改 cyberaimongo 模块,添加 generate_session_id_json 函数和 insert_session 函数,前者用于生成会话数据的 json ,后者用于将会话写入数据库。

  2. main.py 中创建一个函数 end_session_and_start_new ,具体功能上面已经提到过了。

  3. main.py 中导入文章最开始我们创建的计时器模块,在创建好 end_session_and_start_new 函数后我们需要初始化这个计时器模块。

  4. get_response_from_llm 中加入计时器重置和 chat_id_pool 相关操作,并在每次回复成功后更新新的 session_id ,保证每次 session_id 都是唯一且不同。

我们一步一步开始……(这不是AI写的!这不是COT!!!

修改 cyberaimongo 模块

我们添加两个模块:

generate_session_id_json

    def generate_session_id_json(self, chat_id_pool: list, session_summary: str, session_id: str, timestamp: datetime = None) -> str:
        """
        生成会话ID的JSON字符串。

        :param chat_id_pool: 聊天ID池
        :param session_summary: 会话摘要
        :param session_id: 会话ID
        :param timestamp: 时间戳
        :return: 会话ID的JSON字符串
        """
        if timestamp is None:
            timestamp = datetime.now()
        timestamp_str = timestamp.strftime("%Y-%m-%dT%H:%M")

        # 将ObjectId对象转换为字符串
        chat_id_pool = [str(id) for id in chat_id_pool]

        chat_id_pool_json_data = json.dumps(chat_id_pool)
        session_id_json_data = json.dumps(str(session_id))  # 将ObjectId对象转换为字符串

        chat_data = {
            'timestamp': timestamp_str,
            'session_id': session_id_json_data,
            'session_summary': session_summary,
            'chat_id_pool': chat_id_pool_json_data,
        }
        return json.dumps(chat_data)

insert_session

    def insert_session(self, collection_name: str, chat_id_pool: list, session_summary: str, session_id: str, timestamp: datetime = None) -> str:
        """
        插入会话记录到指定集合。

        :param collection_name: 集合名称
        :param chat_id_pool: 会话ID池
        :param session_summary: 会话摘要
        :param session_id: 会话ID
        :param timestamp: 时间戳
        :return: 插入记录的ID
        """
        json_data = self.generate_session_id_json(chat_id_pool, session_summary, session_id, timestamp)
        collection = self.db[collection_name]
        if isinstance(json_data, str):
            json_data = json.loads(json_data)
        return collection.insert_one(json_data).inserted_id

end_session_and_start_new 函数

def end_session_and_start_new():
    """
    结束当前会话并开始新会话
    """
    # 初始化一个空字符串来存储所有元数据
    metadata_str = ''
    global session_id

    # 如果会话 ID 不为空
    for chat_id in chat_id_pool:
        # 获取当前会话的聊天记录
        chat_data = mongo_manager.get_data_by_id(
            collection_name='daily',
            id=chat_id
        )
        # 提取需要的字段并连接成一个字符串
        if chat_data:
            timestamp = chat_data.get('timestamp', '')
            ai_reply = chat_data.get('ai_reply', '')
            human_message = chat_data.get('human_message', '')
            metadata_str += f'{timestamp} \n User: {human_message} \n CyberAi: {ai_reply} \n\n'

    if not metadata_str.strip():
        print('没有聊天记录,无需生成总结')
        return None
    
    # 生成会话总结
    session_summary = summarize_chat_history(metadata_str)

    # 将会话总结插入到数据库
    session_id = mongo_manager.insert_session(
        collection_name='session-history',
        session_id_pool=chat_id_pool,
        session_summary=session_summary,
        session_id=session_id
    )
    print(f"已生成会话总结")

    # 清空聊天记录和会话 ID
    chat_id_pool.clear()
    chat_history.clear()
    print('已清空聊天历史缓存')
    session_id = None
    print('已清空会话 ID')

初始化计时器

# 在main.py开头增加:
from cyberaimodules import cyberaitimer
session_timeout_time = 60  # 定义会话超时时间(以秒为单位)

# 在 `end_session_and_start_new` 函数后初始化计时器:
session_timer = cyberaitimer.CyberaiTimer(timeout=session_timeout_time, callback=end_session_and_start_new)

修改 get_response_from_llm 函数

这里修改了部分内容:

  1. 删除了上一篇的上下文总结记忆,只保留上下文窗口记忆,并使用 SUMMARY_THRESHOLD 决定对话轮次。
  2. 将用户输入和 LLM 响应添加到聊天记录中,此前这一功能在函数外
  3. 启动会话计时器
  4. 暂时注释掉了查询数据库的载入记忆的功能,因为下文我们将会添加常态记忆加载功能。
  5. session_id 相关功能的重置或者生成,前文已经提到。

完整的代码看起来是这样的:

def get_response_from_llm(question: str) -> str:
    """
    获取 LLM 的响应
    :param question: 用户的问题
    :return: LLM 的响应
    """

    global chat_history
    # 将用户输入添加到聊天记录中
    chat_history.append(('human', question))



    # 获取最近的聊天记录窗口
    chat_history_window = "\n".join([f"{role}: {content}" for role, content in chat_history[-2*SUMMARY_THRESHOLD:-1]])
    chat_history_prompt = f"Here is the chat history:\n {chat_history_window}"    

    # 创建消息列表
    message = [
        {"role": "system", "content": "You are a catgirl! Output in Chinese."},
        {"role": "assistant", "content": chat_history_prompt},
        {"role": "user", "content": question},
    ]

    # 调用 LLM 获取响应
    response = chat_model.chat.completions.create(
        model='gpt-4o',
        messages=message,
        temperature=0.7,
    )
    
    print(f"message: {message}")
    # 获取响应内容
    response_str = response.choices[0].message.content

    # 将 LLM 的响应添加到聊天记录中
    chat_history.append(('ai', response_str))
    
		# 启动会话计时器
    session_timer.start_timer()

    # 将对话记录插入到数据库
    chat_id = mongo_manager.insert_chat(
        collection_name='daily',
        ai_reply=response_str,
        human_message=question,
        session_id=str(uuid.uuid4()),
    )

    # query = mongo_manager.get_data_by_id(collection_name='daily', id=chat_id)
    
    global session_id
    if session_id is None:
        session_id = str(uuid.uuid4()).replace('-', '')
        print(f"已生成新的会话ID:{session_id}")

    # 把本次对话的记录的绝对ID插入到数据库
    chat_id_pool.append(chat_id)

    return response_str

记忆加载

上面两个部分我们已经完成了对于聊天记录和会话记忆的写入,现在我们将添加两个简单的功能:加载上几次会话和加载昨天的所有会话。这里抛砖引玉,简单介绍一下常态记忆加载的逻辑:实际使用场景中,对于刚刚发生的事情或者最近发生过的事情被提起的频率会比久远的事情要高。考虑到控制成本的目的,我们可以让 LLM 每一次都加载最近的几次会话的总结内容,达到一个记忆和成本的平衡。如果对于响应速度要求不高,也可以附加一个记忆管理器,用另一个 LLM 与其串联,控制是否触发载入更多记忆或者特定时间段的会话。如果有响应速度要求,也可以使用关键词触发或者使用历史偏好库(需要先前触发数据支撑)决定是否触发记忆。限于篇幅,这里就不做过多介绍了,思路大概是这样。

我们需要修改 cyberaimongo 模块,添加两个函数:get_recent_memget_yesterday_mem ,前者可以获得最近的n次会话的总结,后者可以获得昨天的所有总结。

    def get_recent_mem(self, n: int = 1, collection_name: str = 'session-history', datapart: str = 'session_summary') -> str:
        """
        获取最近的n次记录。如果记录数量不足n,则返回实际数量的记录。
    
        :param n: 记录数量
        :param collection_name: 集合名称
        :param datapart: 数据部分
        :return: 记录摘要字符串
        """
        collection = self.db[collection_name]
        cursor = collection.find().sort('timestamp', -1).limit(n)
        summaries = [[item['timestamp'], item.get(datapart, ''), item.get('tags', [])] for item in cursor]
        if not summaries:
            return ''
        summaries_str = '\n'.join([str(summary) for summary in summaries])
        return summaries_str
    
    def get_yesterday_mem(self, collection_name: str = 'session-history', datapart: str = 'session_summary') -> str:
        """
        获取昨天的所有记录。如果没有记录,则返回空字符串。
    
        :param collection_name: 集合名称
        :param datapart: 数据部分
        :return: 记录摘要字符串
        """
        now = datetime.now()
        start = datetime(now.year, now.month, now.day) - timedelta(days=1)
        end = start + timedelta(days=1)
        sessions = self.get_mem_in_time_range(collection_name, start, end)
        if not sessions:
            return ''
        summaries = [f"{session['timestamp']} {session.get(datapart, '')}" for session in sessions]
        summaries_str = '\n'.join(summaries)
        return summaries_str

然后我们要在 get_response_from_llm 函数里添加有关载入的逻辑……这部分就比较有意思了,正如前文所说的,记忆的触发有很多种方式,各有优缺点,没有最好的,请选择最适合你的方式。另外有一个小细节,当我们获得记忆之后,如果是 GPT,是给放到 system 还是 assistantprompt 里面更合适呢?这个有很多种情况,我个人更倾向于放到 assistant 里面,因为这不会对基本提示词产生较大影响。放在 system 里,如果你还有后续的调用工具的相关反馈,有一定概率造成歧义或发生奇怪的 bug(当然如果是 Claude 就没那么多事情了,所以具体情况具体分析,建议大家亲自做实验检验……)

我们简单添加这几行:

    recent_chat_session_history = mongo_manager.get_recent_mem(n=2, collection_name='session-history', datapart='session_summary')
    yesterday_chat_session_history = mongo_manager.get_yesterday_mem(collection_name='session-history', datapart='session_summary')
    chat_history_prompt = f"Here is the memory yesterday:{yesterday_chat_session_history}\n Here is the memory recently:{recent_chat_session_history} \n Here is the current chat history:\n {chat_history_window}"    

如果你不想每次都触发,你可以加一个二级触发器或者使用关键字触发,你也可以更改 n 的数字来设置载入之前的 n 次会话内容。

下面是完整的 main.py :

# main.py
import os
import uuid
from openai import OpenAI

# 导入之前创建的 cyberaimodules 模块
from cyberaimodules import cyberaimongo
from cyberaimodules import cyberaitimer

mongo_host = os.getenv('MONGO_HOST', 'localhost')
mongo_port = int(os.getenv('MONGO_PORT', 27017))
mongo_user = os.getenv('MONGO_USER', 'admin')
mongo_password = os.getenv('MONGO_PASSWORD', 'secret')
# 数据库的名字叫做 chat_history
mongo_db_name = os.getenv('MONGO_DB_NAME', 'chat_history')

# 初始化 MongoDB 客户端
mongo_manager = cyberaimongo.MongoManager(
    host=mongo_host,
    port=mongo_port,
    username=mongo_user,
    password=mongo_password,
    db_name=mongo_db_name,
)

# 初始化 OpenAI 模型
chat_model= OpenAI(
	# 你需要把这个替换成你的后端的API地址
	base_url="https://api.openai.com/v1/",
	# 这是用于身份验证的 API Key
	api_key = "sk-SbmHyhKJHt3378h9dn1145141919810D1Fbcd12d"
	)

# 聊天记录和总结记录的列表
chat_history = []
SUMMARY_THRESHOLD = 4  # 定义上下文窗口长度
session_timeout_time = 600  # 定义会话超时时间(以秒为单位)
session_id = None
chat_id_pool = []


def summarize_chat_history(chat_history_window):
    """
    对最近的聊天记录进行总结
    :param chat_history_window: 最近的聊天记录
    :return: 总结后的字符串
    """
    # 创建总结提示
    summary_prompt = f"请总结以下对话内容:\n{chat_history_window}"

    print(f"正在对以下内容生成总结: {chat_history_window}")
    # 调用 LLM 生成总结
    summary_response = chat_model.chat.completions.create(
        model='gpt-4o-mini',
        messages=[{"role": "user", "content": summary_prompt}],
        temperature=0.7,
    )
    # 获取总结内容
    summary_str = summary_response.choices[0].message.content
    print(f"生成的总结: {summary_str}")
    return summary_str


def end_session_and_start_new():
    """
    结束当前会话并开始新会话
    """
    # 初始化一个空字符串来存储所有元数据
    metadata_str = ''
    global session_id

    # 如果会话 ID 不为空
    for chat_id in chat_id_pool:
        # 获取当前会话的聊天记录
        chat_data = mongo_manager.get_data_by_id(
            collection_name='daily',
            id=chat_id
        )
        # 提取需要的字段并连接成一个字符串
        if chat_data:
            timestamp = chat_data.get('timestamp', '')
            ai_reply = chat_data.get('ai_reply', '')
            human_message = chat_data.get('human_message', '')
            metadata_str += f'{timestamp} \n User: {human_message} \n CyberAi: {ai_reply} \n\n'

    if not metadata_str.strip():
        print('没有聊天记录,无需生成总结')
        return None
    
    # 生成会话总结
    session_summary = summarize_chat_history(metadata_str)
    print(f"sessionsummary: {session_summary}")

    # 将会话总结插入到数据库
    session_id = mongo_manager.insert_session(
        collection_name='session-history',
        chat_id_pool=chat_id_pool,
        session_summary=session_summary,
        session_id=session_id
    )
    print(f"已生成会话总结")

    # 清空聊天记录和会话 ID
    chat_id_pool.clear()
    chat_history.clear()
    print('已清空聊天历史缓存')
    session_id = None
    print('已清空会话 ID')

session_timer = cyberaitimer.CyberaiTimer(timeout=session_timeout_time, callback=end_session_and_start_new)


def get_response_from_llm(question: str) -> str:
    """
    获取 LLM 的响应
    :param question: 用户的问题
    :return: LLM 的响应
    """
    global chat_history
    # 将用户输入添加到聊天记录中
    chat_history.append(('human', question))


    recent_chat_session_history = mongo_manager.get_recent_mem(n=2, collection_name='session-history', datapart='session_summary')
    yesterday_chat_session_history = mongo_manager.get_yesterday_mem(collection_name='session-history', datapart='session_summary')

    # 获取最近的聊天记录窗口
    chat_history_window = "\n".join([f"{role}: {content}" for role, content in chat_history[-2*SUMMARY_THRESHOLD:-1]])
    chat_history_prompt = f"Here is the memory yesterday:{yesterday_chat_session_history}\n Here is the memory recently:{recent_chat_session_history} \n Here is the current chat history:\n {chat_history_window}"    

    print(f"chat_history_prompt: {chat_history_prompt}\n")
    # 创建消息列表
    message = [
        {"role": "system", "content": "You are a catgirl! Output in Chinese."},
        {"role": "assistant", "content": chat_history_prompt},
        {"role": "user", "content": question},
    ]

    # 调用 LLM 获取响应
    response = chat_model.chat.completions.create(
        model='gpt-4o',
        messages=message,
        temperature=0.7,
    )
    
    print(f"message: {message}")
    # 获取响应内容
    response_str = response.choices[0].message.content

    # 将 LLM 的响应添加到聊天记录中
    chat_history.append(('ai', response_str))

    # 启动会话计时器
    session_timer.start_timer()


    # 将对话记录插入到数据库
    chat_id = mongo_manager.insert_chat(
        collection_name='daily',
        ai_reply=response_str,
        human_message=question,
        session_id=str(uuid.uuid4()),
    )

    # query = mongo_manager.get_data_by_id(collection_name='daily', id=chat_id)
    
    global session_id
    if session_id is None:
        session_id = str(uuid.uuid4()).replace('-', '')
        print(f"已生成新的会话ID:{session_id}")

    # 把本次对话的记录的绝对ID插入到数据库
    chat_id_pool.append(chat_id)

    return response_str


if __name__ == "__main__":
    while True:
        user_input = input("\n输入问题或者请输入'exit'退出:")
        if user_input.lower() == 'exit':
            print("再见")
            break 
        
        # 获取 LLM 的响应
        response = get_response_from_llm(user_input)
        
        # 打印 LLM 的响应
        print(response)

这里是完整的 cyberaimongo 模块:

# cyberaimongo.py - MongoDB管理模块
import json
import uuid
from datetime import datetime, timedelta
from pymongo import MongoClient

class MongoManager:
    def __init__(self, host: str = 'localhost', port: int = 27017, username: str = 'admin', password: str = 'secret', db_name: str = 'chat_history'):
        """
        初始化MongoManager类,连接到MongoDB数据库。

        :param host: MongoDB主机地址
        :param port: MongoDB端口号
        :param username: MongoDB用户名
        :param password: MongoDB密码
        :param db_name: 数据库名称
        """
        self.client = MongoClient(f'mongodb://{username}:{password}@{host}:{port}/')
        self.db = self.client[db_name]

    def generate_chat_json(self, ai_reply: str, human_message: str, session_id: str, timestamp: datetime = None) -> str:
        """
        生成聊天记录的JSON字符串。

        :param ai_reply: AI回复内容
        :param human_message: 人类消息内容
        :param session_id: 会话ID
        :param timestamp: 时间戳
        :return: 聊天记录的JSON字符串
        """
        if timestamp is None:
            timestamp = datetime.now()
        timestamp_str = timestamp.strftime("%Y-%m-%dT%H:%M")
        chat_data = {
            'message_id': str(uuid.uuid4()),
            'ai_reply': ai_reply,
            'human_message': human_message,
            'timestamp': timestamp_str,
            'session_id': session_id
        }
        return json.dumps(chat_data, ensure_ascii=False, indent=4)
    
    def generate_session_id_json(self, chat_id_pool: list, session_summary: str, session_id: str, timestamp: datetime = None) -> str:
        """
        生成会话ID的JSON字符串。

        :param chat_id_pool: 聊天ID池
        :param session_summary: 会话摘要
        :param session_id: 会话ID
        :param timestamp: 时间戳
        :return: 会话ID的JSON字符串
        """
        if timestamp is None:
            timestamp = datetime.now()
        timestamp_str = timestamp.strftime("%Y-%m-%dT%H:%M")

        # 将ObjectId对象转换为字符串
        chat_id_pool = [str(id) for id in chat_id_pool]

        chat_id_pool_json_data = json.dumps(chat_id_pool)
        session_id_json_data = json.dumps(str(session_id))  # 将ObjectId对象转换为字符串

        chat_data = {
            'timestamp': timestamp_str,
            'session_id': session_id_json_data,
            'session_summary': session_summary,
            'chat_id_pool': chat_id_pool_json_data,
        }
        return json.dumps(chat_data)
    
    def insert_chat(self, collection_name: str, ai_reply: str, human_message: str, session_id: str, timestamp: datetime = None) -> str:
        """
        插入聊天记录到指定集合。

        :param collection_name: 集合名称
        :param ai_reply: AI回复内容
        :param human_message: 人类消息内容
        :param session_id: 会话ID
        :param timestamp: 时间戳
        :return: 插入记录的ID
        """
        json_data = self.generate_chat_json(ai_reply, human_message, session_id, timestamp)
        collection = self.db[collection_name]
        if isinstance(json_data, str):
            json_data = json.loads(json_data)
        return collection.insert_one(json_data).inserted_id

    def insert_session(self, collection_name: str, chat_id_pool: list, session_summary: str, session_id: str, timestamp: datetime = None) -> str:
        """
        插入会话记录到指定集合。

        :param collection_name: 集合名称
        :param chat_id_pool: 会话ID池
        :param session_summary: 会话摘要
        :param session_id: 会话ID
        :param timestamp: 时间戳
        :return: 插入记录的ID
        """
        json_data = self.generate_session_id_json(chat_id_pool, session_summary, session_id, timestamp)
        collection = self.db[collection_name]
        if isinstance(json_data, str):
            json_data = json.loads(json_data)
        return collection.insert_one(json_data).inserted_id

    def get_mem_in_time_range(self, collection_name: str, start_time: datetime, end_time: datetime) -> list:
        """
        获取指定时间范围内的记录。

        :param collection_name: 集合名称
        :param start_time: 开始时间
        :param end_time: 结束时间
        :return: 记录列表
        """
        collection = self.db[collection_name]
        cursor = collection.find({
            'timestamp': {
                '$gte': start_time.isoformat(),
                '$lte': end_time.isoformat()
            }
        })
        return list(cursor)

    def get_data_by_id(self, collection_name: str, id: str) -> dict:
        """
        根据ID获取记录。

        :param collection_name: 集合名称
        :param id: 记录ID
        :return: 记录数据
        """
        collection = self.db[collection_name]
        data = collection.find_one({'_id': id})
        return data
    
    def get_recent_mem(self, n: int = 1, collection_name: str = 'session-history', datapart: str = 'session_summary') -> str:
        """
        获取最近的n次记录。如果记录数量不足n,则返回实际数量的记录。

        :param n: 记录数量
        :param collection_name: 集合名称
        :param datapart: 数据部分
        :return: 记录摘要字符串
        """
        try:
            collection = self.db[collection_name]
            cursor = collection.find().sort('timestamp', -1).limit(n)
            summaries = [[item['timestamp'], item[datapart]] for item in cursor]
            
            if not summaries:
                return ''
            
            summaries_str = '\n'.join([str(summary) for summary in summaries])
            return summaries_str
        
        except Exception as e:
            print(f"An error occurred: {e}")
            return ''

    
    def get_yesterday_mem(self, collection_name: str = 'session-history', datapart: str = 'session_summary') -> str:
        """
        获取昨天的所有记录。如果没有记录,则返回空字符串。
    
        :param collection_name: 集合名称
        :param datapart: 数据部分
        :return: 记录摘要字符串
        """
        now = datetime.now()
        start = datetime(now.year, now.month, now.day) - timedelta(days=1)
        end = start + timedelta(days=1)
        sessions = self.get_mem_in_time_range(collection_name, start, end)
        if not sessions:
            return ''
        summaries = [f"{session['timestamp']} {session[datapart]}" for session in sessions]
        summaries_str = '\n'.join(summaries)
        return summaries_str
    

至此尝试运行主程序,应该就可以实现自动的总结以及会话记忆/加载功能了。实际使用下来,推荐采用混合触发机制,也就是所谓的动态方案,你需要快速回复,就直接关键词。不需要速度懒得想关键词,那就专门弄一个记忆管理器用来触发和管理记忆的读写。

随堂拓展

要不尝试一下把之前介绍过的 TTS 功能写成一个模块并集成到现有代码里?一定会很好玩(