package VoiceCall import ( "fmt" "github.com/go-resty/resty/v2" lksdk "github.com/livekit/server-sdk-go/v2" "github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4/pkg/media/oggwriter" "io" "os/exec" "runtime" "strings" "time" ) type Voice struct { VoiceServiceAddress string `toml:"VoiceServiceAddress"` RoomID string `toml:"RoomID"` ParticipantName string `toml:"ParticipantName"` MakeTokenAddress string `toml:"MakeTokenAddress"` Volume int `toml:"Volume"` RecordingEquipment string `toml:"RecordingEquipment"` player map[string]*TrackWriter roomObj *lksdk.Room refresh func() } var HTTPClient3s = resty.New() func (receiver *Voice) getToken() (string, error) { get, err := HTTPClient3s.R().Get(fmt.Sprintf(receiver.MakeTokenAddress, receiver.RoomID, receiver.ParticipantName)) if err != nil || get.StatusCode() != 200 { return "", err } return string(get.Body()), nil } func (receiver *Voice) SetOnRefresh(refresh func()) { receiver.refresh = refresh } func (receiver *Voice) ConnectRoom() error { var err error var token string token, err = receiver.getToken() if err != nil { return err } receiver.player = make(map[string]*TrackWriter) receiver.roomObj, err = lksdk.ConnectToRoomWithToken(receiver.VoiceServiceAddress, token, &lksdk.RoomCallback{ ParticipantCallback: lksdk.ParticipantCallback{ OnTrackSubscribed: receiver.onTrackSubscribed, OnTrackUnsubscribed: receiver.onTrackUnsubscribed, }, }) if err != nil { return err } return nil } func (receiver *Voice) PushMicrophone() error { // 判断当前是windows还是linux var cmd *exec.Cmd if strings.Contains(runtime.GOOS, "windows") { cmd = exec.Command( "ffmpeg", "-f", "dshow", "-i", receiver.RecordingEquipment, "-ac", "2", // 强制为单声道 "-ar", "48000", // 设置采样率为 48kHz "-c:a", "libopus", "-b:a", "32k", // 设置比特率为 32 kbps "-frame_duration", "10", // 每帧持续 20ms "-application", "lowdelay", // 设置低延迟编码 "-f", "ogg", "-page_duration", "10000", "pipe:1", ) } else { cmd = exec.Command( "ffmpeg", "-f", "alsa", // 使用 ALSA 捕获音频 "-i", receiver.RecordingEquipment, // 音频设备,例如 "hw:0,0" "-ac", "2", // 强制为单声道 "-ar", "48000", // 设置采样率为 48kHz "-c:a", "libopus", // 使用 Opus 编解码器 "-b:a", "32k", // 设置比特率为 32 kbps "-frame_duration", "10", // 每帧持续 20ms "-application", "lowdelay", // 设置低延迟编码 "-f", "ogg", // 输出格式为 OGG "-page_duration", "10000", // 设置页持续时间 "pipe:1", // 输出到管道 ) } // Get the stdout pipe stdout, err := cmd.StdoutPipe() if err != nil { fmt.Println("Error creating StdoutPipe:", err) return err } // Start the ffmpeg command if err = cmd.Start(); err != nil { fmt.Println("Error starting ffmpeg command:", err) return err } // Create a new local reader track track, err := lksdk.NewLocalReaderTrack(stdout, webrtc.MimeTypeOpus, lksdk.ReaderTrackWithFrameDuration(10*time.Millisecond), lksdk.ReaderTrackWithOnWriteComplete(func() { fmt.Println("track finished") }), ) if err != nil { fmt.Println("Error creating local reader track:", err) return err } // Publish the track to the room if _, err = receiver.roomObj.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{Name: "live_audio"}); err != nil { fmt.Println("Error publishing track:", err) return err } err = cmd.Wait() if err != nil { if err == io.EOF { fmt.Println("ffmpeg process exited successfully") } else { fmt.Println("Error waiting for ffmpeg process:", err) } return err } // 与房间断开连接 receiver.roomObj.Disconnect() return nil } func (receiver *Voice) onTrackUnsubscribed(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { fileName := fmt.Sprintf("%s-%s", rp.Identity(), track.ID()) v, ok := receiver.player[fileName] if ok { v.stop() delete(receiver.player, fileName) println(receiver.player) } } func (receiver *Voice) onTrackSubscribed(track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { switch { case strings.EqualFold(track.Codec().MimeType, "audio/opus"): default: return } fileName := fmt.Sprintf("%s-%s", rp.Identity(), track.ID()) receiver.player[fileName] = NewTrackWriter(track) println(receiver.player) } type TrackWriter struct { track *webrtc.TrackRemote pr *io.PipeReader pw *io.PipeWriter cmd *exec.Cmd } func NewTrackWriter(track *webrtc.TrackRemote) *TrackWriter { pr, pw := io.Pipe() t := &TrackWriter{ pr: pr, pw: pw, track: track, } go t.start() return t } func (t *TrackWriter) start() { t.cmd = exec.Command("ffplay", "-nodisp", "-f", "ogg", "-") //t.cmd = exec.Command("ffplay", "-f", "ogg", "-") t.cmd.Stdin = t.pr err := t.cmd.Start() if err != nil { fmt.Printf("Error starting ffplay: %v\n", err) return } writer, err := oggwriter.NewWith(t.pw, 48000, 2) if err != nil { println(err) return } defer writer.Close() for { rtpPacket, _, err := t.track.ReadRTP() if err != nil { break } err = writer.WriteRTP(rtpPacket) if err != nil { return } } } func (t *TrackWriter) stop() { t.pr.Close() t.pw.Close() if t.cmd.Process != nil { err := t.cmd.Process.Kill() if err != nil { fmt.Printf("Error sending SIGTERM to ffplay process: %v\n", err) return } } t.cmd.Wait() }