agents icon indicating copy to clipboard operation
agents copied to clipboard

Auto Egress not working for agent track

Open willsmanley opened this issue 1 year ago • 8 comments

I am creating a room with autoegress for both the room composite and individual tracks.

My goal is to store 3 recordings: 1) user track, 2) agent track, 3) composite track

1 and 3 are working great, but the agent track is not being egressed properly.

I can see that the agent identity is randomized like "agent-AJ_5quWN4B3jyEh"

Here is the code I am using to create the egress requests from the room:

import "jsr:@supabase/functions-js/edge-runtime.d.ts";
import { AccessToken, RoomServiceClient, S3Upload } from 'npm:livekit-server-sdk';
import { ulid } from 'npm:ulid';

const LIVEKIT_URL = 'asdf';
const LIVEKIT_API_URL = 'asdf';
const LIVEKIT_API_KEY = 'asdf';
const LIVEKIT_API_SECRET = 'asdf';

async function generateToken(): Promise<{ accessToken: string, url: string }> {
  const roomName = ulid();

  const accessToken = new AccessToken(LIVEKIT_API_KEY, LIVEKIT_API_SECRET, {
    identity: "human_user",
  });

  accessToken.addGrant({
    roomJoin: true,
    roomCreate: true,
    roomRecord: true,
    room: roomName,
    canPublish: true,
    canSubscribe: true,
    canPublishData: true,
  });

  const token = await accessToken.toJwt();

  await createRoomWithEgress(roomName, token);

  return {
    accessToken: token,
    url: LIVEKIT_URL,
  };
}

async function createRoomWithEgress(roomName: string, token: string) {
  const output = {
    case: 's3',
    value: new S3Upload({
      secret: "asdf", 
      bucket: "asdf",
      endpoint: 'asdf',
      accessKey: "asdf",
    }),
  };
  const roomClient = new RoomServiceClient(LIVEKIT_API_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET);
  await roomClient.createRoom({
    name: roomName,
    egress: {
      room: {
        roomName: roomName,
        audioOnly: true,
        fileOutputs: [
          {
            filepath: `${roomName}/room-{time}.ogg`,
            output,
          }
        ]
      },
      tracks: {
        filepath: `${roomName}/{publisher_identity}-{time}.ogg`,
        output,
      },
    }
  });

  console.log('Room created successfully:', roomName);
}

Deno.serve(async (req) => {
  return new Response(
    JSON.stringify(await generateToken()),
    { headers: { "Content-Type": "application/json" } },
  );
});

You don't have to run it from deno, it works the same in node or vanilla twirp.

This successfully records the human_user track, but not the agent track. Maybe there is something I am doing wrong, but my understanding is that this should automatically save any participant tracks that join the room.

There is no problem with the request being received properly. If you send a poorly structured request it will reject it with 400 Bad Request. If you send a poorly structured S3 object it will fail. I have confirmed neither of these are applicable as it is storing the other recordings via S3 successfully.

willsmanley avatar Aug 22 '24 16:08 willsmanley

This is apparently a known bug, so I will close this for now. I worked around this by manually egressing the agent track once it has a track ID.

willsmanley avatar Sep 09 '24 20:09 willsmanley

Can you please share you solution for this Wills ? @willsmanley

firattamurlc avatar Oct 20 '24 17:10 firattamurlc

I reopened since I haven't seen this tracked anywhere else. So here's my solution which is a "manual egress" (a second API request) once the track already has an ID:

from datetime import datetime
import random
import string
import livekit.api
from livekit.api.livekit_api import LiveKitAPI
from livekit.protocol.egress import TrackEgressRequest
from livekit.agents import JobContext
import os
import asyncio


def queue_egress_agent_track_once_published(ctx: JobContext):
    asyncio.create_task(_queue_egress_agent_track_once_published(ctx))


async def _queue_egress_agent_track_once_published(ctx: JobContext):
    while True:
        await asyncio.sleep(0.1)
        for publication in ctx.room.local_participant.track_publications.values():
            if publication.sid:
                await egress_agent_track(ctx.room._info.name, publication.sid)
                return
        await _queue_egress_agent_track_once_published(ctx)


async def egress_agent_track(room_name: str, track_id: str):
    api = LiveKitAPI(
        url=os.getenv("LIVEKIT_URL"),
        api_key=os.getenv("LIVEKIT_API_KEY"),
        api_secret=os.getenv("LIVEKIT_API_SECRET"),
    )
    random_string = "".join(random.choices(string.ascii_letters + string.digits, k=12))
    unique_time = datetime.now().strftime("%Y-%m-%dT%H%M%S") + f"-{random_string}"
    track_egress_request = TrackEgressRequest(
        room_name=room_name,
        track_id=track_id,
        file=livekit.api.DirectFileOutput(
            filepath=f"{room_name}/agent-{unique_time}.ogg",
            s3=livekit.api.S3Upload(
                access_key=os.getenv("S3_KEY"),
                secret=os.getenv("S3_SECRET"),
                bucket=os.getenv("S3_SESSION_RECORDINGS_BUCKET"),
                endpoint=os.getenv("S3_ENDPOINT"),
            ),
        ),
    )
    try:
        egress_info = await api.egress.start_track_egress(track_egress_request)
        print(f"started track egress {egress_info}")
    except Exception as e:
        print(f"Error starting track egress: {e}")
    finally:
        await api.aclose()

Note that I added some extra async logic which is not required. Also note that I had to implement a poll that waits to fire the request until the publication ID exists. I don't think there's a good way to set up a listener for it and the poll seems to work just fine.

But I was told that livekit is aware of the auto egress bug and its something they will fix.

willsmanley avatar Oct 20 '24 17:10 willsmanley

Thank you. I think it will be useful for others too. I looked through docs all day to figure this out.

firattamurlc avatar Oct 20 '24 22:10 firattamurlc

I reopened since I haven't seen this tracked anywhere else. So here's my solution which is a "manual egress" (a second API request) once the track already has an ID:

from datetime import datetime
import random
import string
import livekit.api
from livekit.api.livekit_api import LiveKitAPI
from livekit.protocol.egress import TrackEgressRequest
from livekit.agents import JobContext
import os
import asyncio


def queue_egress_agent_track_once_published(ctx: JobContext):
    asyncio.create_task(_queue_egress_agent_track_once_published(ctx))


async def _queue_egress_agent_track_once_published(ctx: JobContext):
    while True:
        await asyncio.sleep(0.1)
        for publication in ctx.room.local_participant.track_publications.values():
            if publication.sid:
                await egress_agent_track(ctx.room._info.name, publication.sid)
                return
        await _queue_egress_agent_track_once_published(ctx)


async def egress_agent_track(room_name: str, track_id: str):
    api = LiveKitAPI(
        url=os.getenv("LIVEKIT_URL"),
        api_key=os.getenv("LIVEKIT_API_KEY"),
        api_secret=os.getenv("LIVEKIT_API_SECRET"),
    )
    random_string = "".join(random.choices(string.ascii_letters + string.digits, k=12))
    unique_time = datetime.now().strftime("%Y-%m-%dT%H%M%S") + f"-{random_string}"
    track_egress_request = TrackEgressRequest(
        room_name=room_name,
        track_id=track_id,
        file=livekit.api.DirectFileOutput(
            filepath=f"{room_name}/agent-{unique_time}.ogg",
            s3=livekit.api.S3Upload(
                access_key=os.getenv("S3_KEY"),
                secret=os.getenv("S3_SECRET"),
                bucket=os.getenv("S3_SESSION_RECORDINGS_BUCKET"),
                endpoint=os.getenv("S3_ENDPOINT"),
            ),
        ),
    )
    try:
        egress_info = await api.egress.start_track_egress(track_egress_request)
        print(f"started track egress {egress_info}")
    except Exception as e:
        print(f"Error starting track egress: {e}")
    finally:
        await api.aclose()

Note that I added some extra async logic which is not required. Also note that I had to implement a poll that waits to fire the request until the publication ID exists. I don't think there's a good way to set up a listener for it and the poll seems to work just fine.

But I was told that livekit is aware of the auto egress bug and its something they will fix.

Thank you very much! The agent should publish its track first, then we can make the egress request so that the egress can record audio of the user and agent. Did I understand correctly? I followed the code at https://docs.livekit.io/home/egress/autoegress/#automatically-record-all-tracks-to-s3 when generating token, but it only recorded the user's audio.

BaiMoHan avatar Oct 21 '24 07:10 BaiMoHan

Thank you. I think it will be useful for others too. I looked through docs all day to figure this out.

Hey, could you please take a look at my problem and let me know if you have encountered similar situations before? https://github.com/livekit/agents/issues/952

BaiMoHan avatar Oct 21 '24 08:10 BaiMoHan

The agent should publish its track first, then we can make the egress request so that the egress can record audio of the user and agent. Did I understand correctly?

Yes exactly. Literally just update your S3 vars and call queue_egress_agent_track_once_published at the end of entrypoint and you're all set.

willsmanley avatar Oct 21 '24 23:10 willsmanley

we are going to handle this on our end to make it so that AutoEgress works for Agent tracks. what's proposed here makes a ton of sense.

davidzhao avatar Oct 22 '24 03:10 davidzhao

we are going to handle this on our end to make it so that AutoEgress works for Agent tracks. what's proposed here makes a ton of sense.

It seems to be fixed, doesn't it? Maybe you should make a notice and then close this.

BaiMoHan avatar Oct 28 '24 13:10 BaiMoHan

I have verified that the agents are now working correctly in my recordings. The issue appears to be resolved. Thank you for your assistance.

firattamurlc avatar Oct 28 '24 19:10 firattamurlc

@ BaiMoHan, yes! we've fixed it on our side. sorry for taking so long!

davidzhao avatar Oct 29 '24 06:10 davidzhao

Hi @willsmanley, I have used your solution(with minor edits) to upload the recording of the session to Google Cloud Storage(GCPUpload).

track_egress_request = TrackEgressRequest(
      room_name=room_name,
      track_id=track_id,
      file=livekit.api.DirectFileOutput(
          filepath=f"{room_name}/agent-{unique_time}.ogg",
          gcp=livekit.api.GCPUpload(
              credentials=file_contents,
              bucket="livekit-agent-recorder",
          ),
      ),
  ) 

But when I listen to the audio recording, I only hear the agent's response and NOT what the user says. I would like a way to also have user's response be part of the final audio so that I can listen everything. Please help me with this, thanks.

Kamal-Moha avatar Apr 20 '25 09:04 Kamal-Moha