178 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			178 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package smart
 | |
| 
 | |
| import (
 | |
| 	"errors"
 | |
| 	"gitea.unprism.ru/KRBL/n9m/v2/pkg/models"
 | |
| 	"gitea.unprism.ru/KRBL/n9m/v2/pkg/protocol"
 | |
| 	"gitea.unprism.ru/KRBL/n9m/v2/pkg/utils"
 | |
| 	"net"
 | |
| 	"sync"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
 | |
| 	length  = 6
 | |
| )
 | |
| 
 | |
| func NewAutoSmartChannelPackage(mainSmartPackage *SmartPackage) (*SmartChannelPackage, error) {
 | |
| 	conn, err := net.Dial(mainSmartPackage.conn.RemoteAddr().Network(), mainSmartPackage.conn.RemoteAddr().String())
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return NewSmartChannelPackage(conn, mainSmartPackage)
 | |
| }
 | |
| 
 | |
| func NewSmartChannelPackage(conn net.Conn, mainSmartPackage *SmartPackage) (*SmartChannelPackage, error) {
 | |
| 	pack := NewSmartPackage(conn)
 | |
| 	pack.pack.Payload.Session = mainSmartPackage.pack.Payload.Session
 | |
| 
 | |
| 	channelName, err := registerChannelHandles(pack)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	smartChannelPackage := &SmartChannelPackage{
 | |
| 		pack:     pack,
 | |
| 		mainPack: mainSmartPackage,
 | |
| 
 | |
| 		channelName: channelName,
 | |
| 
 | |
| 		mutex: sync.RWMutex{},
 | |
| 		ssrc:  make(map[uint16][]func([]byte) error),
 | |
| 	}
 | |
| 
 | |
| 	pack.AddPayloadHandler(protocol.PayloadTypeLive, smartChannelPackage.handleLiveVideo)
 | |
| 
 | |
| 	return smartChannelPackage, nil
 | |
| }
 | |
| 
 | |
| func (channelPack *SmartChannelPackage) Run() error {
 | |
| 	for {
 | |
| 		if err := channelPack.pack.Handle(); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (channelPack *SmartChannelPackage) GetChannelName() string {
 | |
| 	return channelPack.channelName
 | |
| }
 | |
| 
 | |
| func (channelPack *SmartChannelPackage) AddLiveSource(ssrc uint16, source func([]byte) error) {
 | |
| 	channelPack.mutex.Lock()
 | |
| 	defer channelPack.mutex.Unlock()
 | |
| 
 | |
| 	channelPack.ssrc[ssrc] = append(channelPack.ssrc[ssrc], source)
 | |
| }
 | |
| 
 | |
| func (channelPack *SmartChannelPackage) handleLiveVideo(sPack *SmartPackage, pack protocol.Package) error {
 | |
| 	channelPack.mutex.RLock()
 | |
| 
 | |
| 	sources, ok := channelPack.ssrc[uint16(pack.SSRC)]
 | |
| 
 | |
| 	if !ok || len(sources) == 0 {
 | |
| 		channelPack.mutex.RUnlock()
 | |
| 
 | |
| 		var request = models.MediaStreamModelControlStreamRequest{
 | |
| 			PayloadType: protocol.PayloadTypeLive,
 | |
| 			SSRC:        uint16(pack.SSRC),
 | |
| 			StreamName:  channelPack.channelName,
 | |
| 			Command:     models.MediaStreamCommandStop,
 | |
| 		}
 | |
| 
 | |
| 		pack.PayloadType = protocol.PayloadTypeData
 | |
| 		pack.Payload.Module = "MEDIASTREAMMODEL"
 | |
| 		pack.Payload.Operation = "CONTROLSTREAM"
 | |
| 		pack.SetParameters(request)
 | |
| 
 | |
| 		if _, err := channelPack.mainPack.Write(pack.PackPackage()); err != nil {
 | |
| 			panic(err)
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	var errorMask = map[int]bool{}
 | |
| 
 | |
| 	for i, source := range sources {
 | |
| 		if err := source(pack.RawPayload); err != nil {
 | |
| 			errorMask[i] = true
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	channelPack.mutex.RUnlock()
 | |
| 
 | |
| 	if len(errorMask) > 0 {
 | |
| 		newSources := make([]func([]byte) error, 0, len(sources))
 | |
| 
 | |
| 		for i, source := range sources {
 | |
| 			if !errorMask[i] {
 | |
| 				newSources = append(newSources, source)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		channelPack.mutex.Lock()
 | |
| 		channelPack.ssrc[uint16(pack.SSRC)] = newSources
 | |
| 		channelPack.mutex.Unlock()
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func registerChannelHandles(pack *SmartPackage) (string, error) {
 | |
| 	res := make(chan error, 1)
 | |
| 
 | |
| 	pack.AddJSONHandler("CERTIFICATE", "CREATESTREAM", func(smartPackage *SmartPackage, p protocol.Package) error {
 | |
| 		var params models.CertificateCreateStreamResponse
 | |
| 
 | |
| 		if err := p.GetResponseAs(¶ms); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		if params.ErrorCode != 0 {
 | |
| 			res <- errors.New(params.ErrorCause)
 | |
| 		} else {
 | |
| 			res <- nil
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| 
 | |
| 	channelName := utils.RandomString(length, charset)
 | |
| 
 | |
| 	var request = models.CertificateCreateStreamRequest{
 | |
| 		StreamName: channelName,
 | |
| 	}
 | |
| 
 | |
| 	pack.pack.Payload.Module = "CERTIFICATE"
 | |
| 	pack.pack.Payload.Operation = "CREATESTREAM"
 | |
| 	pack.pack.SetParameters(request)
 | |
| 
 | |
| 	pack.Write(pack.pack.PackPackage())
 | |
| 
 | |
| 	for {
 | |
| 		if err := pack.Handle(); err != nil {
 | |
| 			return "", err
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case err := <-res:
 | |
| 			close(res)
 | |
| 
 | |
| 			if err != nil {
 | |
| 				return "", err
 | |
| 			}
 | |
| 
 | |
| 			pack.AddJSONHandler("CERTIFICATE", "CREATESTREAM", func(smartPackage *SmartPackage, p protocol.Package) error {
 | |
| 				return errors.New("stream already exists")
 | |
| 			})
 | |
| 
 | |
| 			return channelName, nil
 | |
| 		default:
 | |
| 		}
 | |
| 	}
 | |
| }
 |