Split-Job 0.93

Wow, it has been a long time since I had time to post anything on this blog. However, I have been making some improvements to the Split-Job script over the last few months and the results are below. I use this script myself a lot and I hope this is useful for some of you as well.  Please see the previous version for more information about the concept and usage of Split-Job.

The changes are listed in the script comments. One aspect that deserves extra attention is the runspace environment. As before, the runspaces that process the pipeline objects do not necessarily have the same variables, functions etc. that your main PowerShell session contains. If you do need those, you now have two options:

  • Specify the -UseProfile switch. This will load your PS profile, which should give you (most of) your snapins, functions and aliases. This is most useful when using Split-Job interactively at the console.
  • Import the specific functions, variables etc. you need by using the SnapIn, Function, Alias and Variable switches. Each of these can take multiple arguments and wildcards. This is more efficient when using Split-Job inside a script. 

I am actually considering making -UseProfile the default and creating the opposite -NoProfile switch instead. This will make it even easier to use Split-Job at the command line. In my case this does not slow things down considerably. If you want, you can exclude portions of your profile (e.g. PowerTab) by testing whether $SplitJobRunSpace exists. Feedback is welcome!

Enjoy,

Arnoud


#requires -version 1.0
################################################################################
## Run commands in multiple concurrent pipelines
##   by Arnoud Jansveld - www.jansveld.net/powershell
## Version History
## 0.93   Improve error handling: errors originating in the Scriptblock now
##        have more meaningful output
##        Show additional info in the progress bar (thanks Stephen Mills)
##        Add SnapIn parameter: imports (registered) PowerShell snapins
##        Add Function parameter: imports functions
##        Add SplitJobRunSpace variable; allows scripts to test if they are 
##        running in a runspace 
##        Add seconds remaining to progress bar (experimental)
## 0.92   Add UseProfile switch: imports the PS profile
##        Add Variable parameter: imports variables
##        Add Alias parameter: imports aliases
##        Restart pipeline if it stops due to an error
##        Set the current path in each runspace to that of the calling process
## 0.91   Revert to v 0.8 input syntax for the script block
##        Add error handling for empty input queue
## 0.9    Add logic to distinguish between scriptblocks and cmdlets or scripts:
##        if a ScriptBlock is specified, a foreach {} wrapper is added
## 0.8    Adds a progress bar
## 0.7    Stop adding runspaces if the queue is already empty
## 0.6    First version. Inspired by Gaurhoth's New-TaskPool script
################################################################################

function Split-Job {
    param (
        $Scriptblock = $(throw 'You must specify a command or script block!'),
        [int]$MaxPipelines=10,
        [switch]$UseProfile,
        [string[]]$Variable,
        [string[]]$Function = @(),
        [string[]]$Alias = @(),
        [string[]]$SnapIn
    ) 

    function Init ($InputQueue){
        # Create the shared thread-safe queue and fill it with the input objects
        $Queue = [Collections.Queue]::Synchronized([Collections.Queue]@($InputQueue))
        $QueueLength = $Queue.Count
        # Do not create more runspaces than input objects
        if ($MaxPipelines -gt $QueueLength) {$MaxPipelines = $QueueLength}
        # Create the script to be run by each runspace
        $Script  = "Set-Location '$PWD'; "
        $Script += {
            $SplitJobQueue = $($Input)
            & {
                trap {continue}
                while ($SplitJobQueue.Count) {$SplitJobQueue.Dequeue()}
            } |
        }.ToString() + $Scriptblock

        # Create an array to keep track of the set of pipelines
        $Pipelines = New-Object System.Collections.ArrayList

        # Collect the functions and aliases to import
        $ImportItems = ($Function -replace '^','Function:') +
            ($Alias -replace '^','Alias:') |
            Get-Item | select PSPath, Definition
        $stopwatch = New-Object System.Diagnostics.Stopwatch
        $stopwatch.Start()
    }

    function Add-Pipeline {
        # This creates a new runspace and starts an asynchronous pipeline with our script.
        # It will automatically start processing objects from the shared queue.
        $Runspace = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace($Host)
        $Runspace.Open()
        $Runspace.SessionStateProxy.SetVariable('SplitJobRunSpace', $True)

        function CreatePipeline {
            param ($Data, $Scriptblock)
            $Pipeline = $Runspace.CreatePipeline($Scriptblock)
            if ($Data) {
                $Null = $Pipeline.Input.Write($Data, $True)
                $Pipeline.Input.Close()
            }
            $Null = $Pipeline.Invoke()
            $Pipeline.Dispose()
        }

        # Optionally import profile, variables, functions and aliases from the main runspace
        if ($UseProfile) {
            CreatePipeline -Script "`$PROFILE = '$PROFILE'; . `$PROFILE"
        }
        if ($Variable) {
            foreach ($var in (Get-Variable $Variable -Scope 2)) {
                trap {continue}
                $Runspace.SessionStateProxy.SetVariable($var.Name, $var.Value)
            }
        }
        if ($ImportItems) {
            CreatePipeline $ImportItems {
                foreach ($item in $Input) {New-Item -Path $item.PSPath -Value $item.Definition}
            }
        }
        if ($SnapIn) {
            CreatePipeline (Get-PSSnapin $Snapin -Registered) {$Input | Add-PSSnapin}
        }
        $Pipeline = $Runspace.CreatePipeline($Script)
        $Null = $Pipeline.Input.Write($Queue)
        $Pipeline.Input.Close()
        $Pipeline.InvokeAsync()
        $Null = $Pipelines.Add($Pipeline)
    }

    function Remove-Pipeline ($Pipeline) {
        # Remove a pipeline and runspace when it is done
        $Pipeline.RunSpace.Close()
        $Pipeline.Dispose()
        $Pipelines.Remove($Pipeline)
    }

    # Main 
    # Initialize the queue from the pipeline
    . Init $Input
    # Start the pipelines
    while ($Pipelines.Count -lt $MaxPipelines -and $Queue.Count) {Add-Pipeline} 

    # Loop through the runspaces and pass their output to the main pipeline
    while ($Pipelines.Count) {
        # Only update the progress bar once a second
        if (($stopwatch.ElapsedMilliseconds - $LastUpdate) -gt 1000) {
            $Completed = $QueueLength - $Queue.Count - $Pipelines.count
            $LastUpdate = $stopwatch.ElapsedMilliseconds
            $SecondsRemaining = $(if ($Completed) {
                (($Queue.Count + $Pipelines.Count)*$LastUpdate/1000/$Completed)
            } else {-1})
            Write-Progress 'Split-Job' ("Queues: $($Pipelines.Count)  Total: $($QueueLength)  " +
            "Completed: $Completed  Pending: $($Queue.Count)")  `
            -PercentComplete ([Math]::Max((100-[Int]($Queue.Count+$Pipelines.Count)/$QueueLength*100),0)) `
            -CurrentOperation "Next item: $(trap {continue}; if ($Queue.Count) {$Queue.Peek()})" `
            -SecondsRemaining $SecondsRemaining
        }
        foreach ($Pipeline in @($Pipelines)) {
            if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline ) {
                $Pipeline.Output.NonBlockingRead()
                $Pipeline.Error.NonBlockingRead() | Out-Default
            } else {
                # Pipeline has stopped; if there was an error show info and restart it
                if ($Pipeline.PipelineStateInfo.State -eq 'Failed') {
                    $Pipeline.PipelineStateInfo.Reason.ErrorRecord |
                        Add-Member NoteProperty writeErrorStream $True -PassThru |
                            Out-Default
                    # Restart the runspace
                    if ($Queue.Count -lt $QueueLength) {Add-Pipeline}
                }
                Remove-Pipeline $Pipeline
            }
        }
        Start-Sleep -Milliseconds 100
    }
}

Split-Job 0.92

Here is an update to the Split-Job function. Based in part on some of the comments on the previous version, I made the following changes:

  1. The format for the scriptblock has changed; this was done to make it more straightforward to specify parameters for those commands/scripts that accept pipeline input. If you need a foreach (%) you will have to include that in the command line. Examples:

    “Server1″,”Server2″,”Server3″ | Split-Job { c:test.ps1 -Force }

    “Server1″,”Server2″,”Server3″ | Split-Job { % {Get-WmiObject Win32_ComputerSystem -ComputerName $_}}

  2. You can now import your profile, variables and/or aliases into the runspaces. This is somewhat of an experiment; I am not convinced this is even a good idea. Please give me your feedback if you think this is useful.
  3. Each runspace will have its current directory ($PWD) set to that of the main runspace.

There is also some error handling code to make the script more robust.

Enjoy!

Arnoud

#requires -version 1.0
################################################################################
## Run commands in multiple concurrent pipelines
##   by Arnoud Jansveld - www.jansveld.net/powershell
## Version History
## 0.92   Add UseProfile switch: imports the PS profile into each runspace
##        Add Variable parameter: imports variable(s) into each runspace
##        Add Alias parameter: imports alias(es)
##        Restart pipeline if it stops due to an error
##        Set the current path in each runspace to that of the calling process
## 0.91   Revert to v 0.8 input syntax for the script block
##        Add error handling for empty input queue
## 0.9    Add logic to distinguish between scriptblocks and cmdlets or scripts:
##        if a ScriptBlock is specified, a foreach {} wrapper is added
## 0.8    Adds a progress bar
## 0.7    Stop adding runspaces if the queue is already empty
## 0.6    First version. Inspired by Gaurhoth's New-TaskPool script
################################################################################

function Split-Job (
    $Scriptblock = $(throw 'You must specify a command or script block!'),
    [int]$MaxPipelines=10,
    [switch]$UseProfile,
    [string[]]$Variable,
    [string[]]$Alias

) {
    # Create the shared thread-safe queue and fill it with the input objects
    $Queue = [Collections.Queue]::Synchronized([Collections.Queue]@($Input))
    $QueueLength = $Queue.Count
    if ($MaxPipelines -gt $QueueLength) {$MaxPipelines = $QueueLength}
    # Set up the script to be run by each runspace
    $Script  = "Set-Location '$PWD'; "
    $Script += '$Queue = $($Input); '
    $Script += '& {trap {continue}; while ($Queue.Count) {$Queue.Dequeue()}} |'
    $Script += $Scriptblock

    # Create an array to keep track of the set of pipelines
    $Pipelines = New-Object System.Collections.ArrayList

    function Add-Pipeline {
        # This creates a new runspace and starts an asynchronous pipeline with our script.
        # It will automatically start processing objects from the shared queue.
        $Runspace = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace($Host)
        $Runspace.Open()
        # Optionally import profile, variables and aliases from the main runspace
        if ($UseProfile) {
            $Pipeline = $Runspace.CreatePipeline(". '$PROFILE'")
            $Pipeline.Invoke()
            $Pipeline.Dispose()
        }
        if ($Variable) {
            Get-Variable $Variable -Scope 2 | foreach {
                trap {continue}
                $Runspace.SessionStateProxy.SetVariable($_.Name, $_.Value)
            }
        }
        if ($Alias) {
            $Pipeline = $Runspace.CreatePipeline({$Input | Set-Alias -value {$_.Definition}})
            $Null = $Pipeline.Input.Write((Get-Alias $Alias -Scope 2), $True)
            $Pipeline.Input.Close()
            $Pipeline.Invoke()
            $Pipeline.Dispose()
        }
        $Pipeline = $Runspace.CreatePipeline($Script)
        $Null = $Pipeline.Input.Write($Queue)
        $Pipeline.Input.Close()
        $Pipeline.InvokeAsync()
        $Null = $Pipelines.Add($Pipeline)
    }

    function Remove-Pipeline ($Pipeline) {
        # Remove a pipeline and runspace when it is done
        $Pipeline.RunSpace.Close()
        $Pipeline.Dispose()
        $Pipelines.Remove($Pipeline)
    }

    # Start the pipelines
    while ($Pipelines.Count -lt $MaxPipelines -and $Queue.Count) {Add-Pipeline} 

    # Loop through the pipelines and pass their output to the pipeline until they are finished
    while ($Pipelines.Count) {
        Write-Progress 'Split-Job' "Queues: $($Pipelines.Count)" `
            -PercentComplete (100 - [Int]($Queue.Count)/$QueueLength*100)
        foreach ($Pipeline in (New-Object System.Collections.ArrayList(,$Pipelines))) {
            if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline ) {
                $Pipeline.Output.NonBlockingRead()
                $Pipeline.Error.NonBlockingRead() | Write-Error
            } else {
                if ($Pipeline.PipelineStateInfo.State -eq 'Failed') {
                    Write-Error $Pipeline.PipelineStateInfo.Reason
                    # Start a new runspace, unless there was a syntax error in the scriptblock
                    if ($Queue.Count -lt $QueueLength) {Add-Pipeline}
                }
                Remove-Pipeline $Pipeline
            }
        }
        Start-Sleep -Milliseconds 100
    }
}

Get-ChildItem is not dir!

Here is a simple but practical tip from the trenches. If you made the switch and are using PowerShell as your main console, you may have noticed that the dir command can be rather sluggish in large directories. While an attempt was made to make the transition to native PS CmdLets easier by making “dir” an alias for Get-ChildItem, there is an important syntax difference.

If you are looking for a particular (set of) files, the trick is to split the full file name(s) into a path and file specification. To illustrate, just run the following commands and notice the performance difference:

PS C:>dir c:windowssystem32*.ini

PS C:>dir c:windowssystem32 *.ini

The second command is much faster because it uses the filtering mechanism from the file system provider. I have been burned by this many times and I suspect that others have too. Several scripts performed dramatically better after making this change. Incidentally, this is why PowerTab‘s (yes, you really SHOULD use this!) file system completion is pretty slow compared to that of the good old cmd prompt.

This is another good example of why it pays to scrutinize the PS help system:

PS C:>help Get-ChildItem -detailed

[...]
PARAMETERS
    -path <string[]>
        Specifies a path to one or more locations. Wildcards are permitted. The default location is the current directo
        ry (.).

    -include <string[]>
        Retrieves only the specified items. The value of this parameter qualifies the Path parameter. Enter a path elem
        ent or pattern, such as “*.txt”. Wildcards are permitted.
[...]

 

Arnoud

 

Split-Job: make your PC work harder

When you need to run a simple process or gather (WMI) data from many machines, you need a lot of patience. Or, you can divide and conquer using multiple PowerShell runspaces. There are many ingenious scripts available on the web that allow us to launch and manage background processes (even for PS v1). For my purposes, I found the necessary inspiration in a blog by Gaurhoth. His New-TaskPool script allows us to run multiple instances of a script block concurrently. Very cool!

The following script is a little different in that it pipes the data to each pipeline using a shared thread-safe queue rather than start a new pipeline for each input object. This reduces overhead and allows scripts that have a begin/process/end block to run more efficiently.

For instance, take this simple data gathering exercise:

Get-Content machines.txt | foreach {Get-WmiObject Win32_ComputerSystem -ComputerName $_} | Export-Csv ComputerInfo.csv

If you have a few hundred machines, this can take forever (especially if some machines are offline). Now replace the foreach alias with the Split-Job function:

Get-Content machines.txt | Split-Job {Get-WmiObject Win32_ComputerSystem -ComputerName $_} | Export-Csv ComputerInfo.csv

It will create 10 runspaces and run the WMI query concurrently, so this should be almost 10x faster. Even if one of the pipelines stalls, the others will keep going. If you already have some data gathering script that accepts pipeline input, you can just drop Split-Job in:

Get-Content machines.txt | Split-Job .MachineReport.ps1 | Export-Csv MachineReport.csv

It is important to note that the position of the script in the pipeline is important; the command preceding it should be quick, e.g. get objects from a text file, AD, SQL etc.

This is a work in progress and I will post more about this in the following weeks. In the meantime, comments and suggestions are welcome!

Arnoud

[UPDATE] The latest version can be found here: http://www.jansveld.net/powershell/2008/06/split-job-092/

 

#requires -version 1.0
###################################################################################################
## Run commands in multiple concurrent pipelines
##   by Arnoud Jansveld
## Version History
## 0.9    Includes logic to distinguish between scriptblocks and cmdlets or scripts. If a ScriptBlock
##        is specified, a foreach {} wrapper is added
## 0.8    Adds a progress bar
## 0.7    Stop adding runspaces if the queue is already empty
## 0.6    First version. Inspired by Gaurhoth's New-TaskPool script
###################################################################################################

function Split-Job (
    $Scriptblock = $(throw 'You must specify a command or script block!'),
    [int]$MaxPipelines=10
) {
    # Create the shared thread-safe queue and fill it with the input objects
    $Queue = [System.Collections.Queue]::Synchronized([System.Collections.Queue]@($Input))
    $QueueLength = $Queue.Count
    # Set up the script to be run by each pipeline
    if ($Scriptblock -is [ScriptBlock]) {$Scriptblock = "foreach {$Scriptblock}"}
    $Script = '$Queue = $($Input); & {while ($Queue.Count) {$Queue.Dequeue()}} | ' + $Scriptblock
    # Create an array to keep track of the set of pipelines
    $Pipelines = New-Object System.Collections.ArrayList

    function Add-Pipeline {
        # This creates a new runspace and starts an asynchronous pipeline with our script.
        # It will automatically start processing objects from the shared queue.
        $Runspace = [System.Management.Automation.Runspaces.RunspaceFactory]::CreateRunspace($Host)
        $Runspace.Open()
        $PipeLine = $Runspace.CreatePipeline($Script)
        $Null = $Pipeline.Input.Write($Queue)
        $Pipeline.Input.Close()
        $PipeLine.InvokeAsync()
        $Null = $Pipelines.Add($Pipeline)
    }

    function Remove-Pipeline ($Pipeline) {
        # Remove a pipeline and runspace when it is done
        $Pipeline.RunSpace.Close()
        $Pipeline.Dispose()
        $Pipelines.Remove($Pipeline)
    }

    # Start the pipelines
    do {Add-Pipeline} until ($Pipelines.Count -ge $MaxPipelines -or $Queue.Count -eq 0)

    # Loop through the pipelines and pass their output to the pipeline until they are finished
    while ($Pipelines.Count) {
        Write-Progress 'Split-Job' "Queues: $($Pipelines.Count)" `
            -PercentComplete (100 - [Int]($Queue.Count)/$QueueLength*100)
        foreach ($Pipeline in (New-Object System.Collections.ArrayList(,$Pipelines))) {
            if ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline ) {
                $Pipeline.Output.NonBlockingRead()
                $Pipeline.Error.NonBlockingRead() | foreach {Write-Error $_}
            } else {
                if ($Pipeline.PipelineStateInfo.State -eq 'Failed') {
                    Write-Error $Pipeline.PipelineStateInfo.Reason
                }
                Remove-Pipeline $Pipeline
            }
        }
        Start-Sleep -Milliseconds 100
    }
}

Select-Properties: a helper for exporting DirectoryEntry objects

When you work with DirectoryEntry objects (AD or local accounts), you quickly run into the fact that their properties are actually PropertyValueCollections. This trips you up with certain operations, like exporting entries. For instance, the following will not work:

# Get some user accounts
$Users = ([ADSI]WinNT://.).PSBase.Children | where {$_.PSBase.SchemaClassName -eq User}
# Export them to a CSV file
$Users | Export-Csv Users.csv

There are no errors, but the exported file will be filled with the text “System.DirectoryServices.PropertyValueCollection”. Not really what we were looking for. The solution is to let PowerShell turn those PropertyValueCollections into regular values. If we just want to get the names and descriptions, we can use:

$Users | select {$_.Name}, {$_.Description} | Export-Csv Users.csv

That’s good enough when you are in a hurry. But if you want the actual property names, and need many (or all) properties, this calls for a scripted solution. This is the version I came up with originally:

filter Select-Properties ($filter='*') {
    # Create a custom object to hold the requested properties
    $custom = New-Object PSObject
    # Get the properties of the object being processed
    $props = $_.PSBase.Properties
    # Add each property that matches our filter to the custom object
    $filter | foreach {
        $props.PropertyNames -like $_ | foreach {
            $custom | add-Member noteProperty $_ $($props.Item($_))
        }
    }
    # Return the custom object
    $custom
}

Now you can use

$Users | Select-Properties | Export-Csv Users.csv

and get the data you really wanted. You can also specify one or more specific properties (with wildcards), just like the Select-Object command. The script has a few drawbacks, but we will save that (and an improved script) for a future post.

Arnoud

 

Show-Progress: a quick progress bar

Often times I need to run some command on a large set of objects, say a WMI query on a few hundred PCs. Here is a simple script to have an easy way to show a progress bar using the built-in Write-Progress cmdlet. In my PS profile it is aliased as “bar” so I can do the following:

Get-Content computers.txt | bar |% {Get-WMIObject Win32_ComputerSystem -ComputerName $_} | Export-Csv ComputerInfo.csv

One thing to keep in mind is that the script should be placed early in the pipeline, as it consumes all objects before passing them on in order to count them. This is fine as long as the preceding commands are quick, like getting names from files, AD, SQL etc. It does not matter what kind of objects are passed through the script.

Here is the script (Show-Progress.ps1):

# Store all pipeline objects in an array so we can count them
$InputArray = @($Input)
# Initialize the counter and length ( divided by 100 so we get the percentage)
$counter = 1
$length = $InputArray.Length / 100
# Push the objects down the pipeline while updating our progress bar
$InputArray | foreach {
    Write-Progress "Processing..." "$_" -perc ([Int]$counter++/$length)
    $_
}

Enjoy,

Arnoud