Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions deal/deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package deal
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"

Expand Down Expand Up @@ -281,14 +283,29 @@ func (m *manager) CheckContentReadyForDealMaking(ctx context.Context, content *u
return nil
}

type DeltaRequest struct {
Cid string `json:"cid"`
Miner string `json:"miner"`
ConnectionMode string `json:"connection_mode"`
RemoveUnsealedCopies bool `json:"remove_unsealed_copies"`
SkipIpniAnnounce bool `json:"skip_ipni_announce"`
}

func (m *manager) makeDealsForContent(ctx context.Context, contID uint64, dealsToBeMade int) error {
ctx, span := m.tracer.Start(ctx, "makeDealsForContent", trace.WithAttributes(
attribute.Int64("content", int64(contID)),
attribute.Int("count", dealsToBeMade),
))
defer span.End()

var deltaRequest DeltaRequest
// set up delta as a separate instance but use the same BLOCKSTORE
// get the content id
// get the list of miners
// make deal for each content

content, err := m.contMgr.GetContent(contID)
deltaRequest.Cid = content.Cid.CID.String()
if err != nil {
return err
}
Expand Down Expand Up @@ -332,14 +349,39 @@ func (m *manager) makeDealsForContent(ctx context.Context, contID uint64, dealsT
continue
}

if _, err := m.MakeDealWithMiner(ctx, content, mn.Address); err != nil {
m.log.Warnf("failed to make deal for cont: %d, with miner: %s - %s", contID, mn.Address, err)
continue
//if _, err := m.MakeDealWithMiner(ctx, content, mn.Address); err != nil {
// m.log.Warnf("failed to make deal for cont: %d, with miner: %s - %s", contID, mn.Address, err)
// continue
//}

//if err := m.dealQueueMgr.MadeOneDeal(contID, m.db); err != nil {
// return err
//}
fmt.Println("making deal with miner: ", mn.Address.String())
deltaRequest.Miner = mn.Address.String()
deltaRequest.ConnectionMode = "e2e"
deltaRequest.RemoveUnsealedCopies = true
deltaRequest.SkipIpniAnnounce = true

// make the deal
payloadBytes, err := json.Marshal(deltaRequest)
if err != nil {
// handle err
}
body := bytes.NewReader(payloadBytes)

req, err := http.NewRequest("POST", "http://localhost:1414/api/v1/deal/existing/content", body)
if err != nil {
// handle err
}
req.Header.Set("Authorization", "Bearer [ESTUARY_API_KEY]")
req.Header.Set("Content-Type", "application/json")

if err := m.dealQueueMgr.MadeOneDeal(contID, m.db); err != nil {
return err
resp, err := http.DefaultClient.Do(req)
if err != nil {
// handle err
}
defer resp.Body.Close()

excludedMiners[mn.Address] = true
break
Expand Down