package chunk import ( "crypto/sha256" "encoding/binary" "fmt" "os" "github.com/debridmediamanager.com/zurg/internal/config" "github.com/debridmediamanager.com/zurg/internal/torrent" ) // Manager manages chunks on disk type Manager struct { ChunkSize int64 LoadAhead int downloader *Downloader storage *Storage queue chan *QueueEntry } type QueueEntry struct { request *Request response chan Response } // RequestID is the binary identifier for a chunk request type RequestID [24]byte func (id RequestID) String() string { return fmt.Sprintf("%032x:%v", id[:16], binary.BigEndian.Uint64(id[16:])) } // Request represents a chunk request type Request struct { id RequestID file *torrent.File offsetStart int64 offsetEnd int64 chunkOffset int64 chunkOffsetEnd int64 sequence int preload bool } // Response represetns a chunk response type Response struct { Sequence int Error error Bytes []byte } // NewManager creates a new chunk manager func NewManager( chunkFile string, chunkSize int64, loadAhead, checkThreads, loadThreads, maxChunks int, t *torrent.TorrentManager, c config.ConfigInterface) (*Manager, error) { pageSize := int64(os.Getpagesize()) if chunkSize < pageSize { return nil, fmt.Errorf("chunk size must not be < %v", pageSize) } if chunkSize%pageSize != 0 { return nil, fmt.Errorf("chunk size must be divideable by %v", pageSize) } // 32-Bit: ~2GB / 64-Bit: ~8EB maxMmapSize := int64(^uint(0) >> 1) if chunkSize > maxMmapSize { return nil, fmt.Errorf("chunk size must be < %v", maxMmapSize) } if maxChunks < 2 || maxChunks < loadAhead { return nil, fmt.Errorf("max-chunks must be greater than 2 and bigger than the load ahead value") } storage, err := NewStorage(chunkSize, maxChunks, maxMmapSize, chunkFile) if nil != err { return nil, err } downloader, err := NewDownloader(loadThreads, storage, chunkSize, t, c) if nil != err { return nil, err } manager := Manager{ ChunkSize: chunkSize, LoadAhead: loadAhead, downloader: downloader, storage: storage, queue: make(chan *QueueEntry, 100), } if err := manager.storage.Clear(); nil != err { return nil, err } for i := 0; i < checkThreads; i++ { go manager.thread() } return &manager, nil } // GetChunk loads one chunk and starts the preload for the next chunks func (m *Manager) GetChunk(object *torrent.File, offset, size int64) ([]byte, error) { maxOffset := object.Bytes if offset > maxOffset { return nil, fmt.Errorf("tried to read past EOF of %v at offset %v", object.ID, offset) } // Log.Infof("Request %v:%v md5:%v", object.ID, offset, object.MD5Checksum) if offset+size > maxOffset { size = object.Bytes - offset } ranges := splitChunkRanges(offset, size, m.ChunkSize) numRanges := len(ranges) responses := make(chan Response, numRanges) last := numRanges - 1 for i, r := range ranges { m.requestChunk(object, r.offset, r.size, i, i == last, responses) } data := make([]byte, size) for i := 0; i < cap(responses); i++ { res := <-responses if nil != res.Error { return nil, res.Error } dataOffset := ranges[res.Sequence].offset - offset if n := copy(data[dataOffset:], res.Bytes); n == 0 { return nil, fmt.Errorf("request %v slice %v has empty response", object.ID, res.Sequence) } } close(responses) return data, nil } func buildRequestID(object *torrent.File, offset int64) (id RequestID) { fileID := object.Link if fileID == "" { fileID = object.Path } hash := sha256.Sum256([]byte(fileID)) copy(id[:16], hash[:16]) binary.BigEndian.PutUint64(id[16:], uint64(offset)) return } func (m *Manager) requestChunk(object *torrent.File, offset, size int64, sequence int, preload bool, response chan Response) { chunkOffset := offset % m.ChunkSize offsetStart := offset - chunkOffset offsetEnd := offsetStart + m.ChunkSize request := &Request{ id: buildRequestID(object, offsetStart), file: object, offsetStart: offsetStart, offsetEnd: offsetEnd, chunkOffset: chunkOffset, chunkOffsetEnd: chunkOffset + size, sequence: sequence, preload: false, } m.queue <- &QueueEntry{ request: request, response: response, } if !preload { return } for i := m.ChunkSize; i < (m.ChunkSize * int64(m.LoadAhead+1)); i += m.ChunkSize { aheadOffsetStart := offsetStart + i aheadOffsetEnd := aheadOffsetStart + m.ChunkSize if uint64(aheadOffsetStart) < uint64(object.Bytes) && uint64(aheadOffsetEnd) < uint64(object.Bytes) { request := &Request{ id: buildRequestID(object, aheadOffsetStart), file: object, offsetStart: aheadOffsetStart, offsetEnd: aheadOffsetEnd, preload: true, } m.queue <- &QueueEntry{ request: request, } } } } type byteRange struct { offset, size int64 } // Calculate request ranges that span multiple chunks // // This can happen with Direct-IO and unaligned reads or // if the size is bigger than the chunk size. func splitChunkRanges(offset, size, chunkSize int64) []byteRange { ranges := make([]byteRange, 0, size/chunkSize+2) for remaining := size; remaining > 0; remaining -= size { size = min(remaining, chunkSize-offset%chunkSize) ranges = append(ranges, byteRange{offset, size}) offset += size } return ranges } func (m *Manager) thread() { for { queueEntry := <-m.queue m.checkChunk(queueEntry.request, queueEntry.response) } } func (m *Manager) checkChunk(req *Request, response chan Response) { if nil == response { if nil == m.storage.Load(req.id) { m.downloader.Download(req, nil) } return } if bytes := m.storage.Load(req.id); nil != bytes { response <- Response{ Sequence: req.sequence, Bytes: adjustResponseChunk(req, bytes), } return } m.downloader.Download(req, func(err error, bytes []byte) { response <- Response{ Sequence: req.sequence, Error: err, Bytes: adjustResponseChunk(req, bytes), } }) } func adjustResponseChunk(req *Request, bytes []byte) []byte { if nil == bytes { return nil } bytesLen := int64(len(bytes)) sOffset := min(req.chunkOffset, bytesLen) eOffset := min(req.chunkOffsetEnd, bytesLen) return bytes[sOffset:eOffset] } func min(x, y int64) int64 { if x < y { return x } return y }