Logo

Autoschematic

GitHub
Cluster Login

Planning and Executing Connector Ops

Just like Terraform or Pulumi, we want to use Autoschematic to manage, create, delete, and modify infrastructure. At its core, this means that we reason about the current state of a resource at addr, and the desired state that we want it to be in. Connnector::plan(...) is where connectors implement this, and is invoked by autoschematic plan for every staged file in git.

The core types involved are:

pub struct PlanResponseElement {
    pub op_definition: String,
    pub writes_outputs: Vec<String>,
    pub friendly_message: Option<String>,
}

pub struct OpExecResponse {
    pub outputs: Option<OutputMapExec>,
    pub friendly_message: Option<String>,
}

pub trait ConnectorOp: Send + Sync + std::fmt::Debug {
    fn to_string(&self) -> anyhow::Result<String>;
    fn from_str(s: &str) -> anyhow::Result<Self>
    where
        Self: Sized;
}

Connector::plan(addr, current, desired) is where your connector will decode addr, compare the current state with the desired state, and compute a series of ops that, when executed by Connector::op_exec(addr, op), should lead to the new current state (as fetched by Connector::get(addr)) being equal to the desired state (or as close as possible). Let's look at the AWS API Gateway V2 connector for an example:

impl Connector for ApiGatewayV2Connector {
    pub async fn plan(
        &self,
        addr: &Path,
        current: Option<Vec<u8>>,
        desired: Option<Vec<u8>>,
    ) -> Result<Vec<PlanResponseElement>, anyhow::Error> {
        let addr = ApiGatewayV2ResourceAddress::from_path(addr)?;
        let mut res = Vec::new();

        match addr {
            ApiGatewayV2ResourceAddress::Api { region: _, api_id } => match (current, desired) {
                (None, None) => {}
                (None, Some(new_api_bytes)) => {
                    let new_api: Api = RON.from_bytes(&new_api_bytes)?;
                    res.push(connector_op!(
                        ApiGatewayV2ConnectorOp::CreateApi(new_api.clone()),
                        format!("Create new API Gateway V2 API `{}`", new_api.name)
                    ));
                }
                (Some(_old_api_bytes), None) => {
                    res.push(connector_op!(
                        ApiGatewayV2ConnectorOp::DeleteApi,
                        format!("Delete API Gateway V2 API `{}`", api_id)
                    ));
                }
                (Some(old_api_bytes), Some(new_api_bytes)) => {
                    let old_api: Api = RON.from_bytes(&old_api_bytes)?;
                    let new_api: Api = RON.from_bytes(&new_api_bytes)?;

                    if old_api != new_api {
                        if old_api.name != new_api.name
                            || old_api.protocol_type != new_api.protocol_type
                            || old_api.api_endpoint != new_api.api_endpoint
                        {
                            let diff = diff_ron_values(&old_api, &new_api).unwrap_or_default();
                            res.push(connector_op!(
                                ApiGatewayV2ConnectorOp::UpdateApi(old_api.clone(), new_api.clone()),
                                format!("Modify API Gateway V2 API `{}`\n{}", old_api.name, diff)
                            ));
                        }

                        if old_api.tags != new_api.tags {
                            let diff = diff_ron_values(&old_api.tags, &new_api.tags).unwrap_or_default();
                            res.push(connector_op!(
                                ApiGatewayV2ConnectorOp::UpdateApiTags(
                                    old_api.tags.unwrap_or_default(),
                                    new_api.tags.unwrap_or_default()
                                ),
                                format!("Modify tags for API Gateway V2 API `{}`\n{}", old_api.name, diff)
                            ));
                        }
                    }
                }
            },
            ApiGatewayV2ResourceAddress::Route {
                region,
                api_id,
                route_id,
            } => match (current, desired) {
                [...]
            }
            [...]
        }
        [...]
    }
    [...]
}

Your custom ConnectorOp is usually an enum with reasonably fine-grained operations for your connector. You'll serialize/deserialize them yourself.

#[derive(Debug, Serialize, Deserialize)]
pub enum S3ConnectorOp {
    CreateBucket(S3Bucket),
    UpdateBucketPolicy(Option<ron::Value>, Option<ron::Value>),
    UpdateBucketPublicAccessBlock(Option<PublicAccessBlock>),
    UpdateBucketAcl(Option<Acl>, Option<Acl>),
    UpdateBucketTags(Tags, Tags),
    DeleteBucket,
}

impl ConnectorOp for S3ConnectorOp {
    fn to_string(&self) -> Result<String, anyhow::Error> {
        Ok(RON.to_string(self)?)
    }

    fn from_str(s: &str) -> Result<Self, anyhow::Error>
    where
        Self: Sized,
    {
        Ok(RON.from_str(s)?)
    }
}
impl Connector for ApiGatewayV2Connector {
    pub async fn op_exec(&self, addr: &Path, op: &str) -> Result<OpExecResponse, anyhow::Error> {
        let addr = ApiGatewayV2ResourceAddress::from_path(addr)?;
        let op = ApiGatewayV2ConnectorOp::from_str(op)?;

        let account_id = self.account_id.read().await.clone();

        match &addr {
            ApiGatewayV2ResourceAddress::Api { region, api_id } => {
                let client = self.get_or_init_client(region).await?;
                match op {
                    ApiGatewayV2ConnectorOp::CreateApi(api) => create_api(&client, &account_id, region, api).await,
                    ApiGatewayV2ConnectorOp::UpdateApi(old_api, new_api) => {
                        update_api(&client, &account_id, region, api_id, old_api, new_api).await
                    }
                    ApiGatewayV2ConnectorOp::UpdateApiTags(old_tags, new_tags) => {
                        update_api_tags(&client, &account_id, region, api_id, old_tags, new_tags).await
                    }
                    ApiGatewayV2ConnectorOp::DeleteApi => delete_api(&client, region, api_id).await,
                    _ => Err(invalid_op(&addr, &op)),
                }
            }
            ApiGatewayV2ResourceAddress::Route {
                region,
                api_id,
                route_id,
            } => {
                [...]
            }
        [...]
    }
    [...]
}
match op {
    VpcConnectorOp::CreateVpc(vpc) => op_impl::create_vpc(&client, &vpc).await,
    VpcConnectorOp::UpdateVpcTags(old_tags, new_tags) => {
        op_impl::update_vpc_tags(&client, &vpc_id, &old_tags, &new_tags).await
    }
    VpcConnectorOp::DeleteVpc => op_impl::delete_vpc(&client, &vpc_id).await,
    _ => Err(invalid_op(&addr, &op)),
}

Reference files:

Next: Outputs and Virtual Addresses