From 53fa6353d21e2dfea0f4d69661e1018ba07f59f3 Mon Sep 17 00:00:00 2001 From: Will Murphy Date: Mon, 23 Sep 2019 12:52:22 -0500 Subject: [PATCH] more organization, cache db handle in store to simplify calls, addressing & federated delivery --- index.js | 3 +- net/validators.js | 44 ++++++++-------------------- pub/activity.js | 71 +++++++++++++++++++++++++++++++++++++++++++++ pub/actor.js | 6 ++-- pub/federation.js | 30 ++++++++++++++++++- pub/index.js | 1 + pub/object.js | 27 ++++++++++++----- pub/utils.js | 31 +++++++++++++++++--- routes/inbox.js | 46 ++++++----------------------- routes/outbox.js | 11 +++---- store/actor.js | 8 +++-- store/connection.js | 8 +++++ store/index.js | 5 ++-- store/object.js | 20 +++++++++---- store/setup.js | 5 ++-- store/stream.js | 34 ++++++++++++++++++++++ 16 files changed, 248 insertions(+), 102 deletions(-) create mode 100644 pub/activity.js create mode 100644 store/connection.js create mode 100644 store/stream.js diff --git a/index.js b/index.js index 1bc666b..ca368de 100644 --- a/index.js +++ b/index.js @@ -63,10 +63,11 @@ client.connect({ useNewUrlParser: true }) console.log('Connected successfully to server') db = client.db(dbName) app.set('db', db) + store.connection.setDb(db) return pub.actor.createLocalActor('dummy', 'Person') }) .then(dummy => { - return store.setup(db, DOMAIN, dummy) + return store.setup(DOMAIN, dummy) }) .then(() => { https.createServer(sslOptions, app).listen(app.get('port-https'), function () { diff --git a/net/validators.js b/net/validators.js index 33551d6..925b712 100644 --- a/net/validators.js +++ b/net/validators.js @@ -1,47 +1,27 @@ -const { ObjectId } = require('mongodb') -// const activities = ['Create', ] const pub = require('../pub') -function validateObject (object) { - if (object && object.id) { - object['@context'] = object['@context'] || pub.consts.ASContext - return true - } -} - -function validateActivity (object) { - if (object && object.id && object.actor) { - return true - } -} - module.exports.activity = function activity (req, res, next) { - // TODO real validation - if (!validateActivity(req.body)) { + if (!pub.utils.validateActivity(req.body)) { return res.status(400).send('Invalid activity') } next() } module.exports.outboxActivity = function outboxActivity (req, res, next) { - if (!validateActivity(req.body)) { - if (!validateObject(req.body)) { + if (!pub.utils.validateActivity(req.body)) { + if (!pub.utils.validateObject(req.body)) { return res.status(400).send('Invalid activity') } - const newID = new ObjectId() - req.body = { - _id: newID, - '@context': pub.consts.ASContext, - type: 'Create', - id: `https://${req.app.get('domain')}/o/${newID.toHexString()}`, - actor: req.body.attributedTo, - object: req.body, - published: new Date().toISOString(), - to: req.body.to, - cc: req.body.cc, - bcc: req.body.cc, - audience: req.body.audience + const actor = pub.utils.usernameToIRI(req.user) + const extras = {} + if (req.body.bcc) { + extras.bcc = req.body.bcc } + if (req.body.audience) { + extras.audience = req.body.audience + } + req.body = pub.activity + .build('Create', actor, req.body, req.body.to, req.body.cc, extras) } next() } diff --git a/pub/activity.js b/pub/activity.js new file mode 100644 index 0000000..b1dda40 --- /dev/null +++ b/pub/activity.js @@ -0,0 +1,71 @@ +'use strict' +const { ObjectId } = require('mongodb') +const store = require('../store') +const pubUtils = require('./utils') +const pubObject = require('./object') +const pubFederation = require('./federation') +module.exports = { + address, + addToOutbox, + build +} + +function build (type, actorId, object, to, cc, etc) { + const oid = new ObjectId() + const act = Object.assign({ + // _id: oid, + id: pubUtils.objectIdToIRI(oid), + type, + actor: actorId, + object, + to, + cc, + published: new Date().toISOString() + }, etc) + return act +} + +async function address (activity) { + let audience = [] + ;['to', 'bto', 'cc', 'bcc', 'audience'].forEach(t => { + if (activity[t]) { + audience = audience.concat(activity[t]) + } + }) + audience = audience.map(t => { + if (t === 'https://www.w3.org/ns/activitystreams#Public') { + return null + } + return pubObject.resolveObject(t) + }) + audience = await Promise.all(audience).then(addresses => { + // TODO: spec says only deliver to actor-owned collections + addresses = addresses.map(t => { + if (t && t.inbox) { + return t + } + if (t && t.items) { + return t.items.map(pubObject.resolveObject) + } + if (t && t.orderedItems) { + return t.orderedItems.map(pubObject.resolveObject) + } + }) + // flattens and resolves collections + return Promise.all([].concat(...addresses)) + }) + audience = audience.filter(t => t && t.inbox) + .map(t => t.inbox) + // de-dupe + return Array.from(new Set(audience)) +} + +function addToOutbox (actor, activity) { + return Promise.all([ + // ensure object is cached, but don't alter representation in activity + // so activities can be sent with objects as links + pubObject.resolve(activity.object), + store.stream.save(activity), + address(activity).then(addresses => pubFederation.deliver(actor, activity, addresses)) + ]) +} diff --git a/pub/actor.js b/pub/actor.js index c8234e7..4dd6c09 100644 --- a/pub/actor.js +++ b/pub/actor.js @@ -49,15 +49,15 @@ function createLocalActor (name, type) { }) } -async function getOrCreateActor (preferredUsername, db, includeMeta) { +async function getOrCreateActor (preferredUsername, includeMeta) { const id = pubUtils.usernameToIRI(preferredUsername) - let user = await store.actor.getActor(id, db, includeMeta) + let user = await store.actor.getActor(id, includeMeta) if (user) { return user } // auto create groups whenever an unknown actor is referenced user = await createLocalActor(preferredUsername, 'Group') - await db.collection('objects').insertOne(user) + await store.object.save(user) // only executed on success delete user._id if (includeMeta !== true) { diff --git a/pub/federation.js b/pub/federation.js index 8ef2167..e4ec69c 100644 --- a/pub/federation.js +++ b/pub/federation.js @@ -1,9 +1,11 @@ 'use strict' const request = require('request-promise-native') +const pubUtils = require('./utils') // federation communication utilities module.exports = { - requestObject + requestObject, + deliver } function requestObject (id) { @@ -13,3 +15,29 @@ function requestObject (id) { json: true }) } + +function deliver (actor, activity, addresses) { + if (activity.bto) { + delete activity.bto + } + if (activity.bcc) { + delete activity.bcc + } + const requests = addresses.map(addr => { + return request({ + method: 'POST', + url: addr, + headers: { + 'Content-Type': 'application/activity+json' + }, + httpSignature: { + key: actor._meta.privateKey, + keyId: actor.id, + headers: ['(request-target)', 'host', 'date'] + }, + json: true, + body: pubUtils.toJSONLD(activity) + }) + }) + return Promise.all(requests) +} diff --git a/pub/index.js b/pub/index.js index ebf56d4..1d10c20 100644 --- a/pub/index.js +++ b/pub/index.js @@ -1,6 +1,7 @@ 'use strict' // ActivityPub / ActivityStreams utils module.exports = { + activity: require('./activity'), actor: require('./actor'), consts: require('./consts'), federation: require('./federation'), diff --git a/pub/object.js b/pub/object.js index 8bd7f30..f2b21dd 100644 --- a/pub/object.js +++ b/pub/object.js @@ -1,17 +1,30 @@ 'use strict' const store = require('../store') const federation = require('./federation') +const pubUtils = require('./utils') module.exports = { - resolveObject + resolveObject, + resolve: resolveObject } // find object in local DB or fetch from origin server -async function resolveObject (id, db) { - let object = await store.object.get(id, db) - if (object) { - return object +async function resolveObject (id) { + let object + if (pubUtils.validateObject(id)) { + // already an object + object = id + } else { + // resolve id to local object + object = await store.object.get(id) + if (object) { + return object + } + // resolve remote object from id + object = await federation.requestObject(id) + } + // cache non-collection objects + if (object.type !== 'Collection' && object.type !== 'OrderedCollection') { + await store.object.save(object) } - object = await federation.requestObject(id) - await store.object.save(object, db) return object } diff --git a/pub/utils.js b/pub/utils.js index d6778fe..cddcb83 100644 --- a/pub/utils.js +++ b/pub/utils.js @@ -1,12 +1,15 @@ 'use strict' const config = require('../config.json') -const consts = require('./consts') +const pubConsts = require('./consts') module.exports = { usernameToIRI, toJSONLD, arrayToCollection, - actorFromActivity + actorFromActivity, + objectIdToIRI, + validateActivity, + validateObject } function actorFromActivity (activity) { @@ -21,7 +24,7 @@ function actorFromActivity (activity) { function arrayToCollection (arr, ordered) { return { - '@context': consts.ASContext, + '@context': pubConsts.ASContext, totalItems: arr.length, type: ordered ? 'orderedCollection' : 'collection', [ordered ? 'orderedItems' : 'items']: arr @@ -29,10 +32,30 @@ function arrayToCollection (arr, ordered) { } function toJSONLD (obj) { - obj['@context'] = obj['@context'] || consts.ASContext + obj['@context'] = obj['@context'] || pubConsts.ASContext return obj } function usernameToIRI (user) { return `https://${config.DOMAIN}/u/${user}` } + +function objectIdToIRI (oid) { + if (oid.toHexString) { + oid = oid.toHexString() + } + return `https://${config.DOMAIN}/o/${oid}` +} + +function validateObject (object) { + if (object && object.id) { + // object['@context'] = object['@context'] || pubConsts.ASContext + return true + } +} + +function validateActivity (object) { + if (object && object.id && object.actor) { + return true + } +} diff --git a/routes/inbox.js b/routes/inbox.js index 02669ca..83151ac 100644 --- a/routes/inbox.js +++ b/routes/inbox.js @@ -2,58 +2,30 @@ const express = require('express') const router = express.Router() const pub = require('../pub') const net = require('../net') -const request = require('request-promise-native') -const { ObjectId } = require('mongodb') +const store = require('../store') router.post('/', net.validators.activity, net.security.verifySignature, function (req, res) { - const db = req.app.get('db') req.body._meta = { _target: pub.utils.usernameToIRI(req.user) } // side effects switch (req.body.type) { case 'Accept': - // TODO - side effect ncessary for following collection? + // TODO - side effect necessary for following collection? break case 'Follow': req.body._meta._target = req.body.object.id // send acceptance reply - Promise.all([ - pub.actor.getOrCreateActor(req.user, db, true), - pub.object.resolveObject(pub.utils.actorFromActivity(req.body), db) - ]) - .then(([user, actor]) => { - if (!actor || !actor.inbox) { - throw new Error('unable to send follow request acceptance: actor inbox not retrievable') - } - const newID = new ObjectId() - const responseOpts = { - method: 'POST', - url: actor.inbox, - headers: { - 'Content-Type': 'application/activity+json' - }, - httpSignature: { - key: user._meta.privateKey, - keyId: user.id, - headers: ['(request-target)', 'host', 'date'] - }, - json: true, - body: pub.utils.toJSONLD({ - _id: newID, - type: 'Accept', - id: `https://${req.app.get('domain')}/o/${newID.toHexString()}`, - actor: user.id, - object: req.body - }) - } - return request(responseOpts) + pub.actor.getOrCreateActor(req.user, true) + .then(user => { + const to = [pub.utils.actorFromActivity(req.body)] + const accept = pub.activity.build('Accept', user.id, req.body.id, to) + return pub.activity.addToOutbox(user, accept) }) - .then(result => console.log('success', result)) .catch(e => console.log(e)) break } Promise.all([ - db.collection('objects').insertOne(req.body.object), - db.collection('streams').insertOne(req.body) + pub.object.resolve(req.body.object), + store.stream.save(req.body) ]).then(() => res.status(200).send()) .catch(err => { console.log(err) diff --git a/routes/outbox.js b/routes/outbox.js index e4a7561..909e875 100644 --- a/routes/outbox.js +++ b/routes/outbox.js @@ -2,13 +2,14 @@ const express = require('express') const router = express.Router() const net = require('../net') const pub = require('../pub') +const store = require('../store') router.post('/', net.validators.outboxActivity, function (req, res) { - const db = req.app.get('db') - Promise.all([ - db.collection('objects').insertOne(req.body.object), - db.collection('streams').insertOne(req.body) - ]).then(() => res.status(200).send()) + store.actor.get(pub.utils.usernameToIRI(req.user), true) + .then(actor => { + return pub.activity.addToOutbox(actor, req.body) + }) + .then(() => res.status(200).send()) .catch(err => { console.log(err) res.status(500).send() diff --git a/store/actor.js b/store/actor.js index 0fe885a..bbe4737 100644 --- a/store/actor.js +++ b/store/actor.js @@ -1,13 +1,15 @@ 'use strict' - +const connection = require('./connection') module.exports = { - getActor + getActor, + get: getActor } const actorProj = { _id: 0, _meta: 0 } const metaActorProj = { _id: 0 } -function getActor (id, db, includeMeta) { +function getActor (id, includeMeta) { + const db = connection.getDb() return db.collection('objects') .find({ id: id }) .limit(1) diff --git a/store/connection.js b/store/connection.js new file mode 100644 index 0000000..f0b0470 --- /dev/null +++ b/store/connection.js @@ -0,0 +1,8 @@ +'use strict' +module.exports = (function () { + let con + return { + setDb: db => { con = db }, + getDb: () => con + } +})() diff --git a/store/index.js b/store/index.js index b607e56..aabcbd4 100644 --- a/store/index.js +++ b/store/index.js @@ -3,6 +3,7 @@ module.exports = { setup: require('./setup'), actor: require('./actor'), - object: require('./object') -// stream: require('./stream'), + object: require('./object'), + stream: require('./stream'), + connection: require('./connection') } diff --git a/store/object.js b/store/object.js index b9a7d9e..2a38b1e 100644 --- a/store/object.js +++ b/store/object.js @@ -1,19 +1,29 @@ 'use strict' - +const connection = require('./connection') module.exports = { get, save } -function get (id, db) { - return db.collection('objects') +function get (id) { + return connection.getDb() + .collection('objects') .find({ id: id }) .limit(1) .project({ _id: 0, _meta: 0 }) .next() } -function save (object, db) { +async function save (object) { + const db = connection.getDb() + const exists = await db.collection('objects') + .find({ id: object.id }) + .project({ _id: 1 }) + .limit(1) + .hasNext() + if (exists) { + return false + } return db.collection('objects') - .insertOne(object) + .insertOne(object, { forceServerObjectId: true }) } diff --git a/store/setup.js b/store/setup.js index 4a97dba..751c462 100644 --- a/store/setup.js +++ b/store/setup.js @@ -1,6 +1,7 @@ 'use strict' - -module.exports = async function dbSetup (db, domain, dummyUser) { +const connection = require('./connection') +module.exports = async function dbSetup (domain, dummyUser) { + const db = connection.getDb() // inbox await db.collection('streams').createIndex({ '_meta._target': 1, diff --git a/store/stream.js b/store/stream.js new file mode 100644 index 0000000..f7175fd --- /dev/null +++ b/store/stream.js @@ -0,0 +1,34 @@ +'use strict' +const connection = require('./connection') +module.exports = { + // get, + save +} + +// function get (id, type, db) { +// return db.collection('objects') +// .find({ id: id }) +// .limit(1) +// .project({ _id: 0, _meta: 0 }) +// .next() +// } + +async function save (activity) { + const db = connection.getDb() + const q = { id: activity.id } + if (activity._meta && activity._meta._target) { + q['_meta._target'] = activity._meta._target + } + const exists = await db.collection('streams') + .find(q) + .project({ _id: 1 }) + .limit(1) + .hasNext() + if (exists) { + return false + } + + return db.collection('streams') + // server object ID avoids mutating local copy of document + .insertOne(activity, { forceServerObjectId: true }) +}