more organization, cache db handle in store to simplify calls, addressing & federated delivery

This commit is contained in:
Will Murphy 2019-09-23 12:52:22 -05:00
parent a8230136f8
commit 53fa6353d2
16 changed files with 248 additions and 102 deletions

View file

@ -63,10 +63,11 @@ client.connect({ useNewUrlParser: true })
console.log('Connected successfully to server') console.log('Connected successfully to server')
db = client.db(dbName) db = client.db(dbName)
app.set('db', db) app.set('db', db)
store.connection.setDb(db)
return pub.actor.createLocalActor('dummy', 'Person') return pub.actor.createLocalActor('dummy', 'Person')
}) })
.then(dummy => { .then(dummy => {
return store.setup(db, DOMAIN, dummy) return store.setup(DOMAIN, dummy)
}) })
.then(() => { .then(() => {
https.createServer(sslOptions, app).listen(app.get('port-https'), function () { https.createServer(sslOptions, app).listen(app.get('port-https'), function () {

View file

@ -1,47 +1,27 @@
const { ObjectId } = require('mongodb')
// const activities = ['Create', ]
const pub = require('../pub') const pub = require('../pub')
function validateObject (object) {
if (object && object.id) {
object['@context'] = object['@context'] || pub.consts.ASContext
return true
}
}
function validateActivity (object) {
if (object && object.id && object.actor) {
return true
}
}
module.exports.activity = function activity (req, res, next) { module.exports.activity = function activity (req, res, next) {
// TODO real validation if (!pub.utils.validateActivity(req.body)) {
if (!validateActivity(req.body)) {
return res.status(400).send('Invalid activity') return res.status(400).send('Invalid activity')
} }
next() next()
} }
module.exports.outboxActivity = function outboxActivity (req, res, next) { module.exports.outboxActivity = function outboxActivity (req, res, next) {
if (!validateActivity(req.body)) { if (!pub.utils.validateActivity(req.body)) {
if (!validateObject(req.body)) { if (!pub.utils.validateObject(req.body)) {
return res.status(400).send('Invalid activity') return res.status(400).send('Invalid activity')
} }
const newID = new ObjectId() const actor = pub.utils.usernameToIRI(req.user)
req.body = { const extras = {}
_id: newID, if (req.body.bcc) {
'@context': pub.consts.ASContext, extras.bcc = req.body.bcc
type: 'Create',
id: `https://${req.app.get('domain')}/o/${newID.toHexString()}`,
actor: req.body.attributedTo,
object: req.body,
published: new Date().toISOString(),
to: req.body.to,
cc: req.body.cc,
bcc: req.body.cc,
audience: req.body.audience
} }
if (req.body.audience) {
extras.audience = req.body.audience
}
req.body = pub.activity
.build('Create', actor, req.body, req.body.to, req.body.cc, extras)
} }
next() next()
} }

71
pub/activity.js Normal file
View file

@ -0,0 +1,71 @@
'use strict'
const { ObjectId } = require('mongodb')
const store = require('../store')
const pubUtils = require('./utils')
const pubObject = require('./object')
const pubFederation = require('./federation')
module.exports = {
address,
addToOutbox,
build
}
function build (type, actorId, object, to, cc, etc) {
const oid = new ObjectId()
const act = Object.assign({
// _id: oid,
id: pubUtils.objectIdToIRI(oid),
type,
actor: actorId,
object,
to,
cc,
published: new Date().toISOString()
}, etc)
return act
}
async function address (activity) {
let audience = []
;['to', 'bto', 'cc', 'bcc', 'audience'].forEach(t => {
if (activity[t]) {
audience = audience.concat(activity[t])
}
})
audience = audience.map(t => {
if (t === 'https://www.w3.org/ns/activitystreams#Public') {
return null
}
return pubObject.resolveObject(t)
})
audience = await Promise.all(audience).then(addresses => {
// TODO: spec says only deliver to actor-owned collections
addresses = addresses.map(t => {
if (t && t.inbox) {
return t
}
if (t && t.items) {
return t.items.map(pubObject.resolveObject)
}
if (t && t.orderedItems) {
return t.orderedItems.map(pubObject.resolveObject)
}
})
// flattens and resolves collections
return Promise.all([].concat(...addresses))
})
audience = audience.filter(t => t && t.inbox)
.map(t => t.inbox)
// de-dupe
return Array.from(new Set(audience))
}
function addToOutbox (actor, activity) {
return Promise.all([
// ensure object is cached, but don't alter representation in activity
// so activities can be sent with objects as links
pubObject.resolve(activity.object),
store.stream.save(activity),
address(activity).then(addresses => pubFederation.deliver(actor, activity, addresses))
])
}

View file

@ -49,15 +49,15 @@ function createLocalActor (name, type) {
}) })
} }
async function getOrCreateActor (preferredUsername, db, includeMeta) { async function getOrCreateActor (preferredUsername, includeMeta) {
const id = pubUtils.usernameToIRI(preferredUsername) const id = pubUtils.usernameToIRI(preferredUsername)
let user = await store.actor.getActor(id, db, includeMeta) let user = await store.actor.getActor(id, includeMeta)
if (user) { if (user) {
return user return user
} }
// auto create groups whenever an unknown actor is referenced // auto create groups whenever an unknown actor is referenced
user = await createLocalActor(preferredUsername, 'Group') user = await createLocalActor(preferredUsername, 'Group')
await db.collection('objects').insertOne(user) await store.object.save(user)
// only executed on success // only executed on success
delete user._id delete user._id
if (includeMeta !== true) { if (includeMeta !== true) {

View file

@ -1,9 +1,11 @@
'use strict' 'use strict'
const request = require('request-promise-native') const request = require('request-promise-native')
const pubUtils = require('./utils')
// federation communication utilities // federation communication utilities
module.exports = { module.exports = {
requestObject requestObject,
deliver
} }
function requestObject (id) { function requestObject (id) {
@ -13,3 +15,29 @@ function requestObject (id) {
json: true json: true
}) })
} }
function deliver (actor, activity, addresses) {
if (activity.bto) {
delete activity.bto
}
if (activity.bcc) {
delete activity.bcc
}
const requests = addresses.map(addr => {
return request({
method: 'POST',
url: addr,
headers: {
'Content-Type': 'application/activity+json'
},
httpSignature: {
key: actor._meta.privateKey,
keyId: actor.id,
headers: ['(request-target)', 'host', 'date']
},
json: true,
body: pubUtils.toJSONLD(activity)
})
})
return Promise.all(requests)
}

View file

@ -1,6 +1,7 @@
'use strict' 'use strict'
// ActivityPub / ActivityStreams utils // ActivityPub / ActivityStreams utils
module.exports = { module.exports = {
activity: require('./activity'),
actor: require('./actor'), actor: require('./actor'),
consts: require('./consts'), consts: require('./consts'),
federation: require('./federation'), federation: require('./federation'),

View file

@ -1,17 +1,30 @@
'use strict' 'use strict'
const store = require('../store') const store = require('../store')
const federation = require('./federation') const federation = require('./federation')
const pubUtils = require('./utils')
module.exports = { module.exports = {
resolveObject resolveObject,
resolve: resolveObject
} }
// find object in local DB or fetch from origin server // find object in local DB or fetch from origin server
async function resolveObject (id, db) { async function resolveObject (id) {
let object = await store.object.get(id, db) let object
if (object) { if (pubUtils.validateObject(id)) {
return object // already an object
object = id
} else {
// resolve id to local object
object = await store.object.get(id)
if (object) {
return object
}
// resolve remote object from id
object = await federation.requestObject(id)
}
// cache non-collection objects
if (object.type !== 'Collection' && object.type !== 'OrderedCollection') {
await store.object.save(object)
} }
object = await federation.requestObject(id)
await store.object.save(object, db)
return object return object
} }

View file

@ -1,12 +1,15 @@
'use strict' 'use strict'
const config = require('../config.json') const config = require('../config.json')
const consts = require('./consts') const pubConsts = require('./consts')
module.exports = { module.exports = {
usernameToIRI, usernameToIRI,
toJSONLD, toJSONLD,
arrayToCollection, arrayToCollection,
actorFromActivity actorFromActivity,
objectIdToIRI,
validateActivity,
validateObject
} }
function actorFromActivity (activity) { function actorFromActivity (activity) {
@ -21,7 +24,7 @@ function actorFromActivity (activity) {
function arrayToCollection (arr, ordered) { function arrayToCollection (arr, ordered) {
return { return {
'@context': consts.ASContext, '@context': pubConsts.ASContext,
totalItems: arr.length, totalItems: arr.length,
type: ordered ? 'orderedCollection' : 'collection', type: ordered ? 'orderedCollection' : 'collection',
[ordered ? 'orderedItems' : 'items']: arr [ordered ? 'orderedItems' : 'items']: arr
@ -29,10 +32,30 @@ function arrayToCollection (arr, ordered) {
} }
function toJSONLD (obj) { function toJSONLD (obj) {
obj['@context'] = obj['@context'] || consts.ASContext obj['@context'] = obj['@context'] || pubConsts.ASContext
return obj return obj
} }
function usernameToIRI (user) { function usernameToIRI (user) {
return `https://${config.DOMAIN}/u/${user}` return `https://${config.DOMAIN}/u/${user}`
} }
function objectIdToIRI (oid) {
if (oid.toHexString) {
oid = oid.toHexString()
}
return `https://${config.DOMAIN}/o/${oid}`
}
function validateObject (object) {
if (object && object.id) {
// object['@context'] = object['@context'] || pubConsts.ASContext
return true
}
}
function validateActivity (object) {
if (object && object.id && object.actor) {
return true
}
}

View file

@ -2,58 +2,30 @@ const express = require('express')
const router = express.Router() const router = express.Router()
const pub = require('../pub') const pub = require('../pub')
const net = require('../net') const net = require('../net')
const request = require('request-promise-native') const store = require('../store')
const { ObjectId } = require('mongodb')
router.post('/', net.validators.activity, net.security.verifySignature, function (req, res) { router.post('/', net.validators.activity, net.security.verifySignature, function (req, res) {
const db = req.app.get('db')
req.body._meta = { _target: pub.utils.usernameToIRI(req.user) } req.body._meta = { _target: pub.utils.usernameToIRI(req.user) }
// side effects // side effects
switch (req.body.type) { switch (req.body.type) {
case 'Accept': case 'Accept':
// TODO - side effect ncessary for following collection? // TODO - side effect necessary for following collection?
break break
case 'Follow': case 'Follow':
req.body._meta._target = req.body.object.id req.body._meta._target = req.body.object.id
// send acceptance reply // send acceptance reply
Promise.all([ pub.actor.getOrCreateActor(req.user, true)
pub.actor.getOrCreateActor(req.user, db, true), .then(user => {
pub.object.resolveObject(pub.utils.actorFromActivity(req.body), db) const to = [pub.utils.actorFromActivity(req.body)]
]) const accept = pub.activity.build('Accept', user.id, req.body.id, to)
.then(([user, actor]) => { return pub.activity.addToOutbox(user, accept)
if (!actor || !actor.inbox) {
throw new Error('unable to send follow request acceptance: actor inbox not retrievable')
}
const newID = new ObjectId()
const responseOpts = {
method: 'POST',
url: actor.inbox,
headers: {
'Content-Type': 'application/activity+json'
},
httpSignature: {
key: user._meta.privateKey,
keyId: user.id,
headers: ['(request-target)', 'host', 'date']
},
json: true,
body: pub.utils.toJSONLD({
_id: newID,
type: 'Accept',
id: `https://${req.app.get('domain')}/o/${newID.toHexString()}`,
actor: user.id,
object: req.body
})
}
return request(responseOpts)
}) })
.then(result => console.log('success', result))
.catch(e => console.log(e)) .catch(e => console.log(e))
break break
} }
Promise.all([ Promise.all([
db.collection('objects').insertOne(req.body.object), pub.object.resolve(req.body.object),
db.collection('streams').insertOne(req.body) store.stream.save(req.body)
]).then(() => res.status(200).send()) ]).then(() => res.status(200).send())
.catch(err => { .catch(err => {
console.log(err) console.log(err)

View file

@ -2,13 +2,14 @@ const express = require('express')
const router = express.Router() const router = express.Router()
const net = require('../net') const net = require('../net')
const pub = require('../pub') const pub = require('../pub')
const store = require('../store')
router.post('/', net.validators.outboxActivity, function (req, res) { router.post('/', net.validators.outboxActivity, function (req, res) {
const db = req.app.get('db') store.actor.get(pub.utils.usernameToIRI(req.user), true)
Promise.all([ .then(actor => {
db.collection('objects').insertOne(req.body.object), return pub.activity.addToOutbox(actor, req.body)
db.collection('streams').insertOne(req.body) })
]).then(() => res.status(200).send()) .then(() => res.status(200).send())
.catch(err => { .catch(err => {
console.log(err) console.log(err)
res.status(500).send() res.status(500).send()

View file

@ -1,13 +1,15 @@
'use strict' 'use strict'
const connection = require('./connection')
module.exports = { module.exports = {
getActor getActor,
get: getActor
} }
const actorProj = { _id: 0, _meta: 0 } const actorProj = { _id: 0, _meta: 0 }
const metaActorProj = { _id: 0 } const metaActorProj = { _id: 0 }
function getActor (id, db, includeMeta) { function getActor (id, includeMeta) {
const db = connection.getDb()
return db.collection('objects') return db.collection('objects')
.find({ id: id }) .find({ id: id })
.limit(1) .limit(1)

8
store/connection.js Normal file
View file

@ -0,0 +1,8 @@
'use strict'
module.exports = (function () {
let con
return {
setDb: db => { con = db },
getDb: () => con
}
})()

View file

@ -3,6 +3,7 @@
module.exports = { module.exports = {
setup: require('./setup'), setup: require('./setup'),
actor: require('./actor'), actor: require('./actor'),
object: require('./object') object: require('./object'),
// stream: require('./stream'), stream: require('./stream'),
connection: require('./connection')
} }

View file

@ -1,19 +1,29 @@
'use strict' 'use strict'
const connection = require('./connection')
module.exports = { module.exports = {
get, get,
save save
} }
function get (id, db) { function get (id) {
return db.collection('objects') return connection.getDb()
.collection('objects')
.find({ id: id }) .find({ id: id })
.limit(1) .limit(1)
.project({ _id: 0, _meta: 0 }) .project({ _id: 0, _meta: 0 })
.next() .next()
} }
function save (object, db) { async function save (object) {
const db = connection.getDb()
const exists = await db.collection('objects')
.find({ id: object.id })
.project({ _id: 1 })
.limit(1)
.hasNext()
if (exists) {
return false
}
return db.collection('objects') return db.collection('objects')
.insertOne(object) .insertOne(object, { forceServerObjectId: true })
} }

View file

@ -1,6 +1,7 @@
'use strict' 'use strict'
const connection = require('./connection')
module.exports = async function dbSetup (db, domain, dummyUser) { module.exports = async function dbSetup (domain, dummyUser) {
const db = connection.getDb()
// inbox // inbox
await db.collection('streams').createIndex({ await db.collection('streams').createIndex({
'_meta._target': 1, '_meta._target': 1,

34
store/stream.js Normal file
View file

@ -0,0 +1,34 @@
'use strict'
const connection = require('./connection')
module.exports = {
// get,
save
}
// function get (id, type, db) {
// return db.collection('objects')
// .find({ id: id })
// .limit(1)
// .project({ _id: 0, _meta: 0 })
// .next()
// }
async function save (activity) {
const db = connection.getDb()
const q = { id: activity.id }
if (activity._meta && activity._meta._target) {
q['_meta._target'] = activity._meta._target
}
const exists = await db.collection('streams')
.find(q)
.project({ _id: 1 })
.limit(1)
.hasNext()
if (exists) {
return false
}
return db.collection('streams')
// server object ID avoids mutating local copy of document
.insertOne(activity, { forceServerObjectId: true })
}