Решение на HTTP сваляч от Георги Костадинов

Обратно към всички решения

Към профила на Георги Костадинов

Резултати

  • 7 точки от тестове
  • 0 бонус точки
  • 7 точки общо
  • 12 успешни тест(а)
  • 5 неуспешни тест(а)

Код

package main
import (
"context"
"errors"
"io"
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"
)
type Reader struct {
pipeReader io.PipeReader
pipeWriter io.PipeWriter
err error
}
func NewReader(pipeReader io.PipeReader, pipeWriter io.PipeWriter) *Reader {
return &Reader{pipeReader, pipeWriter, nil}
}
func (r *Reader) Read(p []byte) (n int, err error) {
n, err = r.pipeReader.Read(p)
if err == nil {
err = r.err
}
return n, err
}
func (r *Reader) Write(bytes []byte, err error) {
r.err = err
r.pipeWriter.Write(bytes)
}
func (r *Reader) SetError(err error) error {
r.err = err
return err
}
const maxUrls int = 128
type Url struct {
id int
url string
}
type Task struct {
url Url
rangeStart int
rangeEnd int
}
type TaskResult struct {
task *Task
taskSkipped bool
result []byte
}
type Queue []*Task
func (q *Queue) Push(n *Task) {
*q = append(*q, n)
}
func (q *Queue) Pop() (n *Task) {
n = (*q)[0]
*q = (*q)[1:]
return
}
func (q *Queue) Len() int {
return len(*q)
}
func getContentLength(urls []Url) int {
contentLength := -1
for _, url := range urls {
response, err := http.Head(url.url)
if err != nil {
continue
}
contentLength = int(response.ContentLength)
}
if contentLength < 0 {
contentLength = 0
}
return contentLength
}
func generateTasks(urls []Url, rangeStart int, rangeEnd int) Queue {
var tasks Queue
numberOfUrls := len(urls)
bucketSizes := make([]int, numberOfUrls)
evenLength := int(float32(rangeEnd-rangeStart+1) / float32(numberOfUrls))
for i := 0; i < numberOfUrls; i++ {
bucketSizes[i] = evenLength
}
surPlus := (rangeEnd - rangeStart + 1) % numberOfUrls
for i := 0; surPlus > 0; surPlus-- {
bucketSizes[i] += 1
i = (i + 1) % numberOfUrls
}
k := rangeStart
for i := 0; i < numberOfUrls && k <= rangeEnd; i++ {
rangeLow := k
rangeHigh := k + bucketSizes[i] - 1
k += bucketSizes[i]
url := urls[i]
tasks.Push(&Task{url, rangeLow, rangeHigh})
}
return tasks
}
func getRangedRequest(httpClient *http.Client, url string, rangeStart int, rangeEnd int) ([]byte, bool) {
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, false
}
request.Header.Add("Range", "bytes="+strconv.Itoa(rangeStart)+"-"+strconv.Itoa(rangeEnd))
response, err := httpClient.Do(request)
if err != nil {
return nil, false
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return body, true
}
statusCode := int(response.StatusCode)
if statusCode >= 300 && statusCode < 600 {
return nil, false
}
repeat := false
if rangeEnd != 1 && int(response.ContentLength)-1 != (rangeEnd-rangeStart) {
repeat = true
}
return body, repeat
}
func getValidUrls(urls []Url, invalidUrls [maxUrls]bool) []Url {
validUrls := make([]Url, 0)
for _, url := range urls {
if invalidUrls[url.id] {
continue
}
validUrls = append(validUrls[:], url)
}
return validUrls
}
func DownloadFile(ctx context.Context, urlStrings []string) io.Reader {
pr, pw := io.Pipe()
outputReader := NewReader(*pr, *pw)
numberOfUrls := len(urlStrings)
maxConcurrency := 0
ctxMaxConcurrency, ok := ctx.Value("max-connections").(int)
if ok {
maxConcurrency = ctxMaxConcurrency
}
if maxConcurrency == 0 || maxConcurrency >= numberOfUrls {
maxConcurrency = numberOfUrls
}
urls := make([]Url, 0)
for id, urlString := range urlStrings {
urls = append(urls[:], Url{id, urlString})
}
var invalidUrls [maxUrls]bool
go func(outputReader *Reader) {
var wg sync.WaitGroup
var errOnce sync.Once
cancel := func() {}
if ctx != nil {
ctx, cancel = context.WithCancel(ctx)
}
contentLength := getContentLength(urls)
rangeEnd := contentLength - 1
if contentLength-1 < 0 {
rangeEnd = 1
}
tasksQueue := generateTasks(urls, 0, rangeEnd)
httpClient := &http.Client{}
throttling := make(chan struct{}, maxConcurrency)
results := make(chan TaskResult, maxUrls)
justAdded := false
stopMainFor := false
for {
time.Sleep(20 * time.Millisecond)
if stopMainFor {
break
}
if tasksQueue.Len() == 0 {
continue
}
task := tasksQueue.Pop()
throttling <- struct{}{}
if !justAdded {
wg.Add(1)
}
if justAdded {
justAdded = false
}
go func(task *Task, wg *sync.WaitGroup) {
defer func() {
if !justAdded {
wg.Done()
}
}()
processTask := func() error {
if ctx != nil {
select {
case <-ctx.Done():
return outputReader.SetError(ctx.Err())
default:
}
}
var taskBytesOut []byte
rangeStart := task.rangeStart
rangeEnd := task.rangeEnd
for {
taskBytes, repeat := getRangedRequest(httpClient, task.url.url, rangeStart, rangeEnd)
if taskBytes == nil {
invalidUrls[task.url.id] = true
validUrls := getValidUrls(urls, invalidUrls)
if len(validUrls) > 0 {
newTasksQueue := generateTasks(validUrls, rangeStart, rangeEnd)
for newTasksQueue.Len() > 0 {
tasksQueue.Push(newTasksQueue.Pop())
justAdded = true
}
}
break
}
if len(taskBytesOut) == 0 {
taskBytesOut = taskBytes
} else {
for _, taskByte := range taskBytes {
taskBytesOut = append(taskBytesOut, taskByte)
}
}
if !repeat {
break
}
if (rangeStart + len(taskBytes)) > rangeEnd {
continue
} else {
rangeStart += len(taskBytes)
}
}
results <- TaskResult{task, invalidUrls[task.url.id], taskBytesOut}
return nil
}
if err := processTask(); err != nil {
errOnce.Do(func() {
cancel()
})
}
<-throttling
go func() {
if tasksQueue.Len() == 0 {
stopMainFor = true
}
}()
}(task, &wg)
}
go func(wg *sync.WaitGroup) {
defer pw.Close()
wg.Wait()
close(throttling)
close(results)
taskCount := 0
totalOutput := make([]byte, contentLength)
receivedBytes := 0
for result := range results {
for index, taskByte := range result.result {
totalOutput[index+result.task.rangeStart] = taskByte
receivedBytes += 1
}
taskCount += 1
}
var err error = nil
validUrls := getValidUrls(urls, invalidUrls)
if len(validUrls) == 0 {
err = errors.New("no valid urls")
}
if taskCount == 0 {
outputReader.Write(nil, err)
} else {
outputReader.Write(totalOutput[:receivedBytes], err)
}
}(&wg)
}(outputReader)
return outputReader
}

Лог от изпълнението

▸ Покажи лога

История (1 версия и 0 коментара)

Георги обнови решението на 03.01.2017 13:56 (преди над 1 година)

▸ Покажи разликите
+package main
+
+import (
+ "context"
+ "errors"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strconv"
+ "sync"
+ "time"
+)
+
+type Reader struct {
+ pipeReader io.PipeReader
+ pipeWriter io.PipeWriter
+ err error
+}
+
+func NewReader(pipeReader io.PipeReader, pipeWriter io.PipeWriter) *Reader {
+ return &Reader{pipeReader, pipeWriter, nil}
+}
+
+func (r *Reader) Read(p []byte) (n int, err error) {
+ n, err = r.pipeReader.Read(p)
+ if err == nil {
+ err = r.err
+ }
+
+ return n, err
+}
+
+func (r *Reader) Write(bytes []byte, err error) {
+ r.err = err
+ r.pipeWriter.Write(bytes)
+}
+
+func (r *Reader) SetError(err error) error {
+ r.err = err
+ return err
+}
+
+const maxUrls int = 128
+
+type Url struct {
+ id int
+ url string
+}
+
+type Task struct {
+ url Url
+ rangeStart int
+ rangeEnd int
+}
+
+type TaskResult struct {
+ task *Task
+ taskSkipped bool
+ result []byte
+}
+
+type Queue []*Task
+
+func (q *Queue) Push(n *Task) {
+ *q = append(*q, n)
+}
+
+func (q *Queue) Pop() (n *Task) {
+ n = (*q)[0]
+ *q = (*q)[1:]
+ return
+}
+
+func (q *Queue) Len() int {
+ return len(*q)
+}
+
+func getContentLength(urls []Url) int {
+ contentLength := -1
+
+ for _, url := range urls {
+ response, err := http.Head(url.url)
+ if err != nil {
+ continue
+ }
+ contentLength = int(response.ContentLength)
+ }
+ if contentLength < 0 {
+ contentLength = 0
+ }
+
+ return contentLength
+}
+
+func generateTasks(urls []Url, rangeStart int, rangeEnd int) Queue {
+ var tasks Queue
+
+ numberOfUrls := len(urls)
+ bucketSizes := make([]int, numberOfUrls)
+
+ evenLength := int(float32(rangeEnd-rangeStart+1) / float32(numberOfUrls))
+ for i := 0; i < numberOfUrls; i++ {
+ bucketSizes[i] = evenLength
+ }
+
+ surPlus := (rangeEnd - rangeStart + 1) % numberOfUrls
+ for i := 0; surPlus > 0; surPlus-- {
+ bucketSizes[i] += 1
+ i = (i + 1) % numberOfUrls
+ }
+
+ k := rangeStart
+ for i := 0; i < numberOfUrls && k <= rangeEnd; i++ {
+ rangeLow := k
+ rangeHigh := k + bucketSizes[i] - 1
+ k += bucketSizes[i]
+
+ url := urls[i]
+ tasks.Push(&Task{url, rangeLow, rangeHigh})
+ }
+
+ return tasks
+}
+
+func getRangedRequest(httpClient *http.Client, url string, rangeStart int, rangeEnd int) ([]byte, bool) {
+ request, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, false
+ }
+
+ request.Header.Add("Range", "bytes="+strconv.Itoa(rangeStart)+"-"+strconv.Itoa(rangeEnd))
+ response, err := httpClient.Do(request)
+
+ if err != nil {
+ return nil, false
+ }
+
+ defer response.Body.Close()
+ body, err := ioutil.ReadAll(response.Body)
+ if err != nil {
+ return body, true
+ }
+ statusCode := int(response.StatusCode)
+ if statusCode >= 300 && statusCode < 600 {
+ return nil, false
+ }
+
+ repeat := false
+ if rangeEnd != 1 && int(response.ContentLength)-1 != (rangeEnd-rangeStart) {
+ repeat = true
+ }
+
+ return body, repeat
+}
+
+func getValidUrls(urls []Url, invalidUrls [maxUrls]bool) []Url {
+ validUrls := make([]Url, 0)
+ for _, url := range urls {
+ if invalidUrls[url.id] {
+ continue
+ }
+ validUrls = append(validUrls[:], url)
+ }
+
+ return validUrls
+}
+
+func DownloadFile(ctx context.Context, urlStrings []string) io.Reader {
+ pr, pw := io.Pipe()
+ outputReader := NewReader(*pr, *pw)
+
+ numberOfUrls := len(urlStrings)
+ maxConcurrency := 0
+ ctxMaxConcurrency, ok := ctx.Value("max-connections").(int)
+ if ok {
+ maxConcurrency = ctxMaxConcurrency
+ }
+ if maxConcurrency == 0 || maxConcurrency >= numberOfUrls {
+ maxConcurrency = numberOfUrls
+ }
+
+ urls := make([]Url, 0)
+ for id, urlString := range urlStrings {
+ urls = append(urls[:], Url{id, urlString})
+ }
+
+ var invalidUrls [maxUrls]bool
+
+ go func(outputReader *Reader) {
+ var wg sync.WaitGroup
+ var errOnce sync.Once
+ cancel := func() {}
+ if ctx != nil {
+ ctx, cancel = context.WithCancel(ctx)
+ }
+
+ contentLength := getContentLength(urls)
+ rangeEnd := contentLength - 1
+ if contentLength-1 < 0 {
+ rangeEnd = 1
+ }
+ tasksQueue := generateTasks(urls, 0, rangeEnd)
+
+ httpClient := &http.Client{}
+ throttling := make(chan struct{}, maxConcurrency)
+ results := make(chan TaskResult, maxUrls)
+
+ justAdded := false
+ stopMainFor := false
+ for {
+ time.Sleep(20 * time.Millisecond)
+ if stopMainFor {
+ break
+ }
+
+ if tasksQueue.Len() == 0 {
+ continue
+ }
+
+ task := tasksQueue.Pop()
+ throttling <- struct{}{}
+
+ if !justAdded {
+ wg.Add(1)
+ }
+ if justAdded {
+ justAdded = false
+ }
+
+ go func(task *Task, wg *sync.WaitGroup) {
+ defer func() {
+ if !justAdded {
+ wg.Done()
+ }
+ }()
+
+ processTask := func() error {
+ if ctx != nil {
+ select {
+ case <-ctx.Done():
+ return outputReader.SetError(ctx.Err())
+ default:
+ }
+ }
+
+ var taskBytesOut []byte
+ rangeStart := task.rangeStart
+ rangeEnd := task.rangeEnd
+ for {
+ taskBytes, repeat := getRangedRequest(httpClient, task.url.url, rangeStart, rangeEnd)
+ if taskBytes == nil {
+ invalidUrls[task.url.id] = true
+ validUrls := getValidUrls(urls, invalidUrls)
+ if len(validUrls) > 0 {
+ newTasksQueue := generateTasks(validUrls, rangeStart, rangeEnd)
+ for newTasksQueue.Len() > 0 {
+ tasksQueue.Push(newTasksQueue.Pop())
+ justAdded = true
+ }
+ }
+ break
+ }
+
+ if len(taskBytesOut) == 0 {
+ taskBytesOut = taskBytes
+ } else {
+ for _, taskByte := range taskBytes {
+ taskBytesOut = append(taskBytesOut, taskByte)
+ }
+ }
+
+ if !repeat {
+ break
+ }
+
+ if (rangeStart + len(taskBytes)) > rangeEnd {
+ continue
+ } else {
+ rangeStart += len(taskBytes)
+ }
+ }
+
+ results <- TaskResult{task, invalidUrls[task.url.id], taskBytesOut}
+
+ return nil
+ }
+
+ if err := processTask(); err != nil {
+ errOnce.Do(func() {
+ cancel()
+ })
+ }
+
+ <-throttling
+
+ go func() {
+ if tasksQueue.Len() == 0 {
+ stopMainFor = true
+ }
+ }()
+ }(task, &wg)
+ }
+
+ go func(wg *sync.WaitGroup) {
+ defer pw.Close()
+ wg.Wait()
+ close(throttling)
+ close(results)
+
+ taskCount := 0
+
+ totalOutput := make([]byte, contentLength)
+ receivedBytes := 0
+ for result := range results {
+ for index, taskByte := range result.result {
+ totalOutput[index+result.task.rangeStart] = taskByte
+ receivedBytes += 1
+ }
+ taskCount += 1
+ }
+
+ var err error = nil
+ validUrls := getValidUrls(urls, invalidUrls)
+ if len(validUrls) == 0 {
+ err = errors.New("no valid urls")
+ }
+ if taskCount == 0 {
+ outputReader.Write(nil, err)
+ } else {
+ outputReader.Write(totalOutput[:receivedBytes], err)
+ }
+ }(&wg)
+ }(outputReader)
+
+ return outputReader
+}