package smart import ( "errors" "gitea.unprism.ru/KRBL/n9m/v2/pkg/models" "gitea.unprism.ru/KRBL/n9m/v2/pkg/protocol" "gitea.unprism.ru/KRBL/n9m/v2/pkg/utils" "net" "sync" ) const ( charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" length = 6 ) func NewSmartChannelPackage(conn net.Conn, mainSmartPackage *SmartPackage) (*SmartChannelPackage, error) { pack := NewSmartPackage(conn) pack.pack.Payload.Session = mainSmartPackage.pack.Payload.Session channelName, err := registerChannelHandles(pack) if err != nil { return nil, err } smartChannelPackage := &SmartChannelPackage{ pack: pack, mainPack: mainSmartPackage, channelName: channelName, mutex: sync.RWMutex{}, ssrc: make(map[uint16][]func([]byte) error), } pack.AddPayloadHandler(protocol.PayloadTypeLive, smartChannelPackage.handleLiveVideo) return smartChannelPackage, nil } func (channelPack *SmartChannelPackage) Run() error { for { if err := channelPack.pack.Handle(); err != nil { return err } } } func (channelPack *SmartChannelPackage) GetChannelName() string { return channelPack.channelName } func (channelPack *SmartChannelPackage) AddLiveSource(ssrc uint16, source func([]byte) error) { channelPack.mutex.Lock() defer channelPack.mutex.Unlock() channelPack.ssrc[ssrc] = append(channelPack.ssrc[ssrc], source) } func (channelPack *SmartChannelPackage) handleLiveVideo(sPack *SmartPackage, pack protocol.Package) error { channelPack.mutex.RLock() sources, ok := channelPack.ssrc[uint16(pack.SSRC)] if !ok || len(sources) == 0 { channelPack.mutex.RUnlock() var request = models.MediaStreamModelControlStreamRequest{ PayloadType: protocol.PayloadTypeLive, SSRC: uint16(pack.SSRC), StreamName: channelPack.channelName, Command: models.MediaStreamCommandStop, } pack.PayloadType = protocol.PayloadTypeData pack.Payload.Module = "MEDIASTREAMMODEL" pack.Payload.Operation = "CONTROLSTREAM" pack.SetParameters(request) if _, err := channelPack.mainPack.Write(pack.PackPackage()); err != nil { panic(err) } return nil } var errorMask = map[int]bool{} for i, source := range sources { if err := source(pack.RawPayload); err != nil { errorMask[i] = true } } channelPack.mutex.RUnlock() if len(errorMask) > 0 { newSources := make([]func([]byte) error, 0, len(sources)) for i, source := range sources { if !errorMask[i] { newSources = append(newSources, source) } } channelPack.mutex.Lock() channelPack.ssrc[uint16(pack.SSRC)] = newSources channelPack.mutex.Unlock() } return nil } func registerChannelHandles(pack *SmartPackage) (string, error) { res := make(chan error, 1) pack.AddJSONHandler("CERTIFICATE", "CREATESTREAM", func(smartPackage *SmartPackage, p protocol.Package) error { var params models.CertificateCreateStreamResponse if err := p.GetResponseAs(¶ms); err != nil { return err } if params.ErrorCode != 0 { res <- errors.New(params.ErrorCause) } else { res <- nil } return nil }) channelName := utils.RandomString(length, charset) var request = models.CertificateCreateStreamRequest{ StreamName: channelName, } pack.pack.Payload.Module = "CERTIFICATE" pack.pack.Payload.Operation = "CREATESTREAM" pack.pack.SetParameters(request) pack.Write(pack.pack.PackPackage()) for { if err := pack.Handle(); err != nil { return "", err } select { case err := <-res: close(res) if err != nil { return "", err } pack.AddJSONHandler("CERTIFICATE", "CREATESTREAM", func(smartPackage *SmartPackage, p protocol.Package) error { return errors.New("stream already exists") }) return channelName, nil default: } } }