Skip to content

Commit

Permalink
1.优化线程池,缩短生成时间
Browse files Browse the repository at this point in the history
  • Loading branch information
surfaceyu committed Aug 2, 2023
1 parent 87130b1 commit efe839a
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 76 deletions.
103 changes: 70 additions & 33 deletions edgeTTS/communicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
"net/http"
"regexp"
"strings"
"sync"

"github.com/gorilla/websocket"
)

type turnContext struct {
ServiceTag string `json:"serviceTag"`
}

type turnAudio struct {
Type string `json:"type"`
StreamID string `json:"streamId"`
Expand All @@ -36,15 +38,18 @@ type turnMetaInnerText struct {
Length int `json:"Length"`
BoundaryType string `json:"BoundaryType"`
}

type turnMetaInnerData struct {
Offset int `json:"Offset"`
Duration int `json:"Duration"`
Text turnMetaInnerText `json:"text"`
}

type turnMetadata struct {
Type string `json:"Type"`
Data turnMetaInnerData `json:"Data"`
}

type turnMeta struct {
Metadata []turnMetadata `json:"Metadata"`
}
Expand All @@ -57,8 +62,16 @@ type communicateChunk struct {
Text string
}

type CommunicateTextOption struct {
type CommunicateTextTask struct {
id int
text string
option CommunicateTextOption

chunk chan communicateChunk
speechData []byte
}

type CommunicateTextOption struct {
voice string
rate string
volume string
Expand All @@ -68,9 +81,8 @@ type Communicate struct {
option CommunicateTextOption
proxy string

conn *websocket.Conn

chunk chan communicateChunk
processorLimit int
tasks chan *CommunicateTextTask
}

func NewCommunicate() *Communicate {
Expand All @@ -80,7 +92,8 @@ func NewCommunicate() *Communicate {
rate: "+0%",
volume: "+0%",
},
chunk: make(chan communicateChunk),
processorLimit: 16,
tasks: make(chan *CommunicateTextTask, 16),
}
}

Expand All @@ -106,14 +119,6 @@ func (c *Communicate) WithVoice(voice string) *Communicate {
return c
}

func (c *Communicate) WithText(text string) *Communicate {
if text == "" {
return c
}
c.option.text = text
return c
}

func (c *Communicate) WithRate(rate string) *Communicate {
if !isValidRate(rate) {
return c
Expand Down Expand Up @@ -164,28 +169,22 @@ func (c *Communicate) openWs() *websocket.Conn {
if err != nil {
log.Fatal("dial:", err)
}
c.conn = conn
return c.conn
return conn
}

func (c *Communicate) turnEnd() {
c.chunk <- communicateChunk{
Type: ChunkTypeEnd,
}
}
func (c *Communicate) close() {

func (c *Communicate) Close() {
defer c.conn.Close()
}

func (c *Communicate) stream(text CommunicateTextOption) chan communicateChunk {
func (c *Communicate) stream(text *CommunicateTextTask) chan communicateChunk {
text.chunk = make(chan communicateChunk)
// texts := splitTextByByteLength(removeIncompatibleCharacters(c.text), calcMaxMsgSize(c.voice, c.rate, c.volume))
c.openWs()
conn := c.openWs()
date := dateToString()
c.fillOption(&text)
c.conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("X-Timestamp:%s\r\nContent-Type:application/json; charset=utf-8\r\nPath:speech.config\r\n\r\n{\"context\":{\"synthesis\":{\"audio\":{\"metadataoptions\":{\"sentenceBoundaryEnabled\":false,\"wordBoundaryEnabled\":true},\"outputFormat\":\"audio-24khz-48kbitrate-mono-mp3\"}}}}\r\n", date)))
c.conn.WriteMessage(websocket.TextMessage, []byte(ssmlHeadersPlusData(uuidWithOutDashes(), date, mkssml(
text.text, text.voice, text.rate, text.volume,
c.fillOption(&text.option)
conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("X-Timestamp:%s\r\nContent-Type:application/json; charset=utf-8\r\nPath:speech.config\r\n\r\n{\"context\":{\"synthesis\":{\"audio\":{\"metadataoptions\":{\"sentenceBoundaryEnabled\":false,\"wordBoundaryEnabled\":true},\"outputFormat\":\"audio-24khz-48kbitrate-mono-mp3\"}}}}\r\n", date)))
conn.WriteMessage(websocket.TextMessage, []byte(ssmlHeadersPlusData(uuidWithOutDashes(), date, mkssml(
text.text, text.option.voice, text.option.rate, text.option.volume,
))))

go func() {
Expand All @@ -202,7 +201,7 @@ func (c *Communicate) stream(text CommunicateTextOption) chan communicateChunk {
// finalUtterance := make(map[int]int)
for {
// 读取消息
messageType, data, err := c.conn.ReadMessage()
messageType, data, err := conn.ReadMessage()
if err != nil {
log.Println(err)
return
Expand All @@ -215,15 +214,17 @@ func (c *Communicate) stream(text CommunicateTextOption) chan communicateChunk {
downloadAudio = true
} else if path == "turn.end" {
downloadAudio = false
c.turnEnd()
text.chunk <- communicateChunk{
Type: ChunkTypeEnd,
}
} else if path == "audio.metadata" {
meta := &turnMeta{}
if err := json.Unmarshal(data, meta); err != nil {
log.Fatalf("We received a text message, but unmarshal failed.")
}
for _, v := range meta.Metadata {
if v.Type == ChunkTypeWordBoundary {
c.chunk <- communicateChunk{
text.chunk <- communicateChunk{
Type: v.Type,
Offset: v.Data.Offset,
Duration: v.Data.Duration,
Expand All @@ -249,7 +250,7 @@ func (c *Communicate) stream(text CommunicateTextOption) chan communicateChunk {
if len(data) < headerLength+2 {
log.Fatalf("We received a binary message, but it is missing the audio data.")
}
c.chunk <- communicateChunk{
text.chunk <- communicateChunk{
Type: ChunkTypeAudio,
Data: data[headerLength+2:],
}
Expand All @@ -258,7 +259,43 @@ func (c *Communicate) stream(text CommunicateTextOption) chan communicateChunk {
}
}()

return c.chunk
return text.chunk
}

func (c *Communicate) allocateTask(tasks []*CommunicateTextTask) {
for id, t := range tasks {
t.id = id
c.tasks <- t
}
close(c.tasks)
}

func (c *Communicate) process(wg *sync.WaitGroup) {
defer wg.Done()
for t := range c.tasks {
chunk := c.stream(t)
for {
v, ok := <-chunk
if ok {
if v.Type == ChunkTypeAudio {
t.speechData = append(t.speechData, v.Data...)
// } else if v.Type == ChunkTypeWordBoundary {
} else if v.Type == ChunkTypeEnd {
close(t.chunk)
break
}
}
}
}
}

func (c *Communicate) createPool() {
var wg sync.WaitGroup
for i := 0; i < c.processorLimit; i++ {
wg.Add(1)
go c.process(&wg)
}
wg.Wait()
}

func isValidVoice(voice string) bool {
Expand Down
59 changes: 17 additions & 42 deletions edgeTTS/edgeTTS.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

type EdgeTTS struct {
communicator *Communicate
texts []CommunicateTextOption
texts []*CommunicateTextTask
outCome io.WriteCloser
}

Expand Down Expand Up @@ -92,68 +92,43 @@ func NewTTS(args Args) *EdgeTTS {
return &EdgeTTS{
communicator: tts,
outCome: file,
texts: []CommunicateTextOption{},
texts: []*CommunicateTextTask{},
}
}

func (eTTS *EdgeTTS) option(text string, voice string, rate string, volume string) CommunicateTextOption {
// voiceToUse := voice
// if voice == "" {
// voiceToUse = eTTS.communicator.option.voice
// }
// rateToUse := rate
// if rate == "" {
// rateToUse = eTTS.communicator.option.rate
// }
// volumeToUse := volume
// if volume == "" {
// volumeToUse = eTTS.communicator.option.volume
// }
// return CommunicateTextOption{
// text: text,
// voice: voiceToUse,
// rate: rateToUse,
// volume: volumeToUse,
// }
return CommunicateTextOption{
text: text,
voice: voice,
rate: rate,
volume: volume,
func (eTTS *EdgeTTS) task(text string, voice string, rate string, volume string) *CommunicateTextTask {
return &CommunicateTextTask{
text: text,
option: CommunicateTextOption{
voice: voice,
rate: rate,
volume: volume,
},
}
}

func (eTTS *EdgeTTS) AddTextDefault(text string) *EdgeTTS {
eTTS.texts = append(eTTS.texts, eTTS.option(text, "", "", ""))
eTTS.texts = append(eTTS.texts, eTTS.task(text, "", "", ""))
return eTTS
}

func (eTTS *EdgeTTS) AddTextWithVoice(text string, voice string) *EdgeTTS {
eTTS.texts = append(eTTS.texts, eTTS.option(text, voice, "", ""))
eTTS.texts = append(eTTS.texts, eTTS.task(text, voice, "", ""))
return eTTS
}

func (eTTS *EdgeTTS) AddText(text string, voice string, rate string, volume string) *EdgeTTS {
eTTS.texts = append(eTTS.texts, eTTS.option(text, voice, rate, volume))
eTTS.texts = append(eTTS.texts, eTTS.task(text, voice, rate, volume))
return eTTS
}

func (eTTS *EdgeTTS) Speak() {
defer eTTS.communicator.Close()
defer eTTS.communicator.close()
defer eTTS.outCome.Close()

go eTTS.communicator.allocateTask(eTTS.texts)
eTTS.communicator.createPool()
for _, text := range eTTS.texts {
task := eTTS.communicator.stream(text)
for {
v, ok := <-task
if ok {
if v.Type == ChunkTypeAudio {
eTTS.outCome.Write(v.Data)
// } else if v.Type == ChunkTypeWordBoundary {
} else if v.Type == ChunkTypeEnd {
break
}
}
}
eTTS.outCome.Write(text.speechData)
}
}
4 changes: 4 additions & 0 deletions examples/multiLine/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package main

import (
"fmt"
"strings"
"time"

"github.com/surfaceyu/edge-tts-go/edgeTTS"
)
Expand Down Expand Up @@ -52,12 +54,14 @@ func main() {
Voice: "",
WriteMedia: "./sample.mp3",
}
start := time.Now()
tts := edgeTTS.NewTTS(args)
for _, v := range contents {
speaker, text := splitSpeaker(v)
tts.AddTextWithVoice(text, speaker)
}
tts.Speak()
fmt.Printf("程序运行时间: %s", time.Since(start))
}

var displayShortMap = map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"strings"
)

const Version = "0.0.1"
const Version = "0.1.0"

func VersionInfo() (int, int, int) {
versionInfo := strings.Split(Version, ".")
Expand Down

0 comments on commit efe839a

Please sign in to comment.