Skip to content

autorun 源码解读 #1

@gismanli

Description

@gismanli

写了段简单的示例,先猜猜这个 autorun 会监听哪些?

const {
  autorun,
  observable,
  extendObservable,
  whyRun
} = mobx;

const arr = observable([{a: {b: 1}}]);
const obj = observable({
        x: 1,
        name: 'gismanli'
});
obj.y = {};
extendObservable(obj.y, {z: 1});

autorun(() => {
	obj.name = 'lili39';
	console.log(obj.x, obj.y.z, arr[0].a.b);
	whyRun();
});

试一试到底监听的啥?autorun

autorun

auotun 运行时,源码解读(注: 下述摘抄代码有适当删减)。

autorun 函数入口

<!-- src/core/autorun.ts -->
function autorun(arg1: any, arg2: any, arg3?: any) {
	let name: string, view: (r: IReactionPublic) => void, scope: any;
	if (typeof arg1 === "string") {
		name = arg1;
		view = arg2;
		scope = arg3;
	} else if (typeof arg1 === "function") {
		name = arg1.name || ("Autorun@" + getNextId());
		view = arg1;
		scope = arg2;
	}

	if (scope)
		view = view.bind(scope);

	const reaction = new Reaction(name, function () {
		this.track(reactionRunner);
	});

	function reactionRunner() {
		view(reaction);
	}

	reaction.schedule();

	return reaction.getDisposer();
}

开始时,autorun 函数首先创建了一个 Reaction

那么什么是 Reaction ?

Reaction

顾名思义就是做出反应的意思,是一类特殊的派生,让我们来看一下 Reaction 函数的定义:

<!-- src/core/reaction.ts -->
export class Reaction implements IDerivation, IReactionPublic {
	observing = []; // nodes we are looking at. Our value depends on these nodes
	newObserving = [];
	dependenciesState = IDerivationState.NOT_TRACKING;
	diffValue = 0;
	runId = 0;
	unboundDepsCount = 0;
	__mapid = "#" + getNextId();
	isDisposed = false;
	_isScheduled = false;
	_isTrackPending = false;
	_isRunning = false;

	constructor(public name: string = "Reaction@" + getNextId(), private onInvalidate: () => void) { }

	onBecomeStale() {
		this.schedule();
	}

	schedule() {
		if (!this._isScheduled) {
			this._isScheduled = true;
			globalState.pendingReactions.push(this);
			startBatch();
			runReactions();
			endBatch();
		}
	}

    ...
}

Reaction 接收两个参数,一个是 name, 另一个是 onInvalidateReaction 中包含了些许方法,常用的有:

  • scedule 启动一个 reaction
  • runReaction 内部方法,如果要启动一个reaction,请使用 scedule
  • track 依赖收集追踪方法

回到 autorun 函数,实例了Reaction后,执行 reaction.scedule, 将该 Reaction 放入到全局的 pendingReactions队列中,进而调用 runReactions ,看一下 runReactions方法:

 <!--src/cor/reaction.ts -->
 let reactionScheduler: (fn: () => void) => void = f => f();
 
 export function runReactions() {
	if (globalState.isRunningReactions === true || globalState.inTransaction > 0)
		return;
	reactionScheduler(runReactionsHelper);
}

function runReactionsHelper() {
	globalState.isRunningReactions = true;
	const allReactions = globalState.pendingReactions;
	let iterations = 0;

	while (allReactions.length > 0) {
		if (++iterations === MAX_REACTION_ITERATIONS) {
			resetGlobalState();
			throw new Error(`Reaction doesn't converge to a stable state after ${MAX_REACTION_ITERATIONS} iterations.`
				+ ` Probably there is a cycle in the reactive function: ${allReactions[0]}`);
		}
		let remainingReactions = allReactions.splice(0);
		for (let i = 0, l = remainingReactions.length; i < l; i++)
			remainingReactions[i].runReaction();
	}
	globalState.isRunningReactions = false;
}

这里就是把全局队列的reation执行,依次执行 remainingReactions 中的 reaction 的 runReaction 方法。

<!-- src/core/reaction.ts -->
/**
 * internal, use schedule() if you intend to kick off a reaction
 */
runReaction() {
	if (!this.isDisposed) {
		this._isScheduled = false;
		if (shouldCompute(this)) {
			this._isTrackPending = true;

			this.onInvalidate();
		}
	}
}

首先确认该事务是否已经处理,然后对该reaction进行判断是否依赖关系是否发生了改变,这里有几个状态:

  • UP_TO_DATE 0
  • NOT_TRACKING -1
  • STALE 2
  • POSSIBLY_STALE 1
<!-- src/core/atom.ts -->
export enum IDerivationState {
	// before being run or (outside batch and not being observed)
	// at this point derivation is not holding any data about dependency tree
	NOT_TRACKING = -1,
	// no shallow dependency changed since last computation
	// won't recalculate derivation
	// this is what makes mobx fast
	  UP_TO_DATE = 0,
	// some deep dependency changed, but don't know if shallow dependency changed
	// will require to check first if UP_TO_DATE or POSSIBLY_STALE
	// currently only ComputedValue will propagate POSSIBLY_STALE
	//
	// having this state is second big optimization:
	// don't have to recompute on every dependency change, but only when it's needed
	POSSIBLY_STALE = 1,
	// shallow dependency changed
	// will need to recompute when it's needed
	STALE = 2
}

如果这个依赖状态的值是 1,则会进行依赖重新计算;如果任何依赖值改变了它,则会继续向上传播通过把这个值变为 2。

这些进行完后,会调用 Reaction 初始化时传进来的第二个参数 onInvalidate,我们看到 autorun 执行时传进来的函数为:

<!-- src/core/autorun.ts -->
const reaction = new Reaction(name, function () {
	this.track(reactionRunner);
});

function reactionRunner() {
	view(reaction);
}

这里函数调用了 this.track(), 前面说到 trackReaction 类的方法,看一下这个方法:

<!-- src/core/reaction.ts -->
track(fn: () => void) {
	startBatch();
	let startTime;
	this._isRunning = true;
	trackDerivedFunction(this, fn);
	this._isRunning = false;
	this._isTrackPending = false;
	if (this.isDisposed) {
		// disposed during last run. Clean up everything that was bound after the dispose call.
		clearObserving(this);
	}
	endBatch();
}

这里MobX使用了 startBatchendBatch, 不同于 Vue 需要开延迟也可以进行批处理。 这里主要执行了 trackDerivedFunction 方法,看一下这个方法:

<!-- src/core/derivation.ts -->
export function trackDerivedFunction<T>(derivation: IDerivation, f: () => T) {
	changeDependenciesStateTo0(derivation);
	derivation.newObserving = new Array(derivation.observing.length + 100);
	derivation.unboundDepsCount = 0;
	derivation.runId = ++globalState.runId;
	const prevTracking = globalState.trackingDerivation;
	globalState.trackingDerivation = derivation;
	let hasException = true;
	let result: T;
	try {
		result = f.call(derivation);
		hasException = false;
	} finally {
		if (hasException) {
			handleExceptionInDerivation(derivation);
		} else {
			globalState.trackingDerivation = prevTracking;
			bindDependencies(derivation);
		}
	}
	return result;
}

这个方法是干什么的呢?这里是为了执行传进来的 f 方法,并追踪在执行期间访问的可观察变量,这些追踪信息会放到derivation 对象中,并会注册一个 observer 观察者在这些访问到的可观察变量里面。

globaleState 是MobxgGlobals的实例:

<!-- src/core/globalstate.ts -->
export class MobXGlobals {
	version = 4;
	trackingDerivation: IDerivation = null;
	runId = 0;
	mobxGuid = 0;
	inTransaction = 0;
	isRunningReactions = false;
	inBatch: number = 0;
	pendingUnobservations: IObservable[] = [];
	pendingReactions: Reaction[] = [];
	allowStateChanges = true;
	strictMode = false;
	resetId = 0;
	spyListeners: {(change: any): void}[] = [];
}

至于为啥会预留100长度,并没有看懂有啥意义:( , 请看注释:pre allocate array allocation + room for variation in deps array will be trimmed by bindDependencies

看一下 bindDependencies 方法:

<!-- src/core/derivation.ts -->
function bindDependencies(derivation: IDerivation) {
	const prevObserving = derivation.observing;
	const observing = derivation.observing = derivation.newObserving;

	derivation.newObserving = null; // newObserving shouldn't be needed outside tracking

	let i0 = 0, l = derivation.unboundDepsCount;
	for (let i = 0; i < l; i++) {
		const dep = observing[i];
		if (dep.diffValue === 0) {
			dep.diffValue = 1;
			if (i0 !== i) observing[i0] = dep;
			i0++;
		}
	}
	observing.length = i0;

	l = prevObserving.length;
	while (l--) {
		const dep = prevObserving[l];
		if (dep.diffValue === 0) {
			removeObserver(dep, derivation);
		}
		dep.diffValue = 0;
	}

	while (i0--) {
		const dep = observing[i0];
		if (dep.diffValue === 1) {
			dep.diffValue = 0;
			addObserver(dep, derivation);
		}
	}
}

这个 bindDependencies 就是通过 newObserving 来更新 observing,并通知相应的 observer 变成 observed/unobserved
newObseving 做diff,这样做有显而易见的有点,比如之前依赖a, b,后来newObserving 为 a, b, c,经过diff,我们就可以只把 c 放入。

后续这里执行了传进来的 f 方法,执行期间会进行变量的 get 操作, 那来看一下这个 get :

<!-- src/types/observableobject.ts -->
export function generateObservablePropConfig(propName) {
	const config = observablePropertyConfigs[propName];
	if (config)
		return config;
	return observablePropertyConfigs[propName] = {
		configurable: true,
		enumerable: true,
		get: function() {
			return this.$mobx.values[propName].get();
		},
		set: function(v) {
			setPropertyValue(this, propName, v);
		}
	};
}

<!-- src/types/observablevalue.ts -->
public get(): T {
	this.reportObserved();
	return this.value;
}

这就是典型的依赖收集,将自己卖身,让需要的 reactions 收集自己。看一下reportObserved:

<!-- src/core/observable.ts -->
export function reportObserved(observable: IObservable) {
	const derivation = globalState.trackingDerivation;
	if (derivation !== null) {
		/**
		 * Simple optimization, give each derivation run an unique id (runId)
		 * Check if last time this observable was accessed the same runId is used
		 * if this is the case, the relation is already known
		 */
		if (derivation.runId !== observable.lastAccessedBy) {
			observable.lastAccessedBy = derivation.runId;
			derivation.newObserving[derivation.unboundDepsCount++] = observable;
		}
	} else if (observable.observers.length === 0) {
		queueForUnobservation(observable);
	}
}

这个函数就是将其自己加在 Reaction 实例的 newObserving 中。

trackDerivedFunction 到此也就结束了,回到 track 函数,进行 endBatch 函数结束处理。

至此 autorun 核心阶段已经运行完成。

change value

这里简单的例子: 更改一个 observable 的变量值,看 Reaction 如何运行的。

首先,当我们更改一个可观察的变量时,会触发其 set 操作:

<!-- src/types/observableobject.ts -->
export function generateObservablePropConfig(propName) {
	const config = observablePropertyConfigs[propName];
	if (config)
		return config;
	return observablePropertyConfigs[propName] = {
		configurable: true,
		enumerable: true,
		get: function() {
			return this.$mobx.values[propName].get();
		},
		set: function(v) {
			setPropertyValue(this, propName, v);
		}
	};
}


export function setPropertyValue(instance, name: string, newValue) {
	const adm = instance.$mobx;
	const observable = adm.values[name];

	// intercept
	if (hasInterceptors(adm)) {
		const change = interceptChange<IObjectWillChange>(adm, {
			type: "update",
			object: instance,
			name, newValue
		});
		if (!change)
			return;
		newValue = change.newValue;
	}
	newValue = observable.prepareNewValue(newValue);

	// notify spy & observers
	if (newValue !== UNCHANGED) {
		const notify = hasListeners(adm);
		const notifySpy = isSpyEnabled();
		const change = notify || notifySpy ? {
				type: "update",
				object: instance,
				oldValue: (observable as any).value,
				name, newValue
			} : null;

		if (notifySpy)
			spyReportStart(change);
		observable.setNewValue(newValue);
		if (notify)
			notifyListeners(adm, change);
		if (notifySpy)
			spyReportEnd();
	}
}

这里会先执行 prepareNewValue 方法,这个方法是检验该值是否真的改变通过调用 valueDidChange 方法,这里是一个深对比(原注释: Navie deepEqual)。
如果这里确实改变了,那么其会调用 makeChildObservable 将子属性变成观察的。

<!-- src/types/modifiers.ts -->
export function makeChildObservable(value, parentMode: ValueMode, name?: string) {
	let childMode: ValueMode;
	if (isObservable(value))
		return value;

	switch (parentMode) {
		case ValueMode.Reference:
			return value;
		case ValueMode.Flat:
			assertUnwrapped(value, "Items inside 'asFlat' cannot have modifiers");
			childMode = ValueMode.Reference;
			break;
		case ValueMode.Structure:
			assertUnwrapped(value, "Items inside 'asStructure' cannot have modifiers");
			childMode = ValueMode.Structure;
			break;
		case ValueMode.Recursive:
			[childMode, value] = getValueModeFromValue(value, ValueMode.Recursive);
			break;
		default:
			invariant(false, "Illegal State");
	}

	if (Array.isArray(value))
		return createObservableArray(value as any[], childMode, name);
	if (isPlainObject(value) && Object.isExtensible(value))
		return extendObservableHelper(value, value, childMode, name);
	return value;
}

如果确实更改了,则会调用 setNewValue 方法进行赋值:

<!-- src/types/observalevalue.ts -->
setNewValue(newValue: T) {
	const oldValue = this.value;
	this.value = newValue;
	this.reportChanged();
}

<!-- src/core/atom.ts -->
public reportChanged() {
	transactionStart("propagatingAtomChange", null, false);
	propagateChanged(this);
	transactionEnd(false);
}

<!-- src/core/observable.ts -->
// Called by Atom when it's value changes
export function propagateChanged(observable: IObservable) {
	// invariantLOS(observable, "changed start");
	if (observable.lowestObserverState === IDerivationState.STALE) return;
	observable.lowestObserverState = IDerivationState.STALE;

	const observers = observable.observers;
	let i = observers.length;
	while (i--) {
		const d = observers[i];
		if (d.dependenciesState === IDerivationState.UP_TO_DATE)
			d.onBecomeStale();
		d.dependenciesState = IDerivationState.STALE;
	}
	// invariantLOS(observable, "changed end");
}

这里就是执行 reportChanged 方法来告诉reaction "你们这些依赖我的,我更新了,你们也要更新了!"。

propagateChanged 中开始执行通知,首先将状态变为 STALE 不稳定的,然后循环调用执行其 观察者onBecomeStale 方法。
这个方法是我们之前初始化 Reaction 实例的方法,让我们在看下它的代码:

<!-- src/core/reaction.ts -->
onBecomeStale() {
	this.schedule();
}

咦? 看起来是不是和相似?没错,一样的,就是在执行下调度,重新进行上面讲到的过程,譬如重新收集依赖啥的,这也即使我们常说的 动态收集依赖

--- 完。

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions